You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/01 19:27:19 UTC

[kafka] branch trunk updated: KAFKA-8410: Revert Part 1: processor context bounds (#8414) (#8595)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd095aa  KAFKA-8410: Revert Part 1: processor context bounds (#8414) (#8595)
fd095aa is described below

commit fd095aaafdd207162cddf293b17f249875b9a532
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 1 14:26:36 2020 -0500

    KAFKA-8410: Revert Part 1: processor context bounds (#8414) (#8595)
    
    This reverts commit 29e08fd2c2d3349ba5cbd8fe5a9d35a0cea02b85.
    There turned out to be more than expected problems with adding the generic parameters.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../examples/docs/DeveloperGuideTesting.java       |  4 +-
 .../apache/kafka/streams/kstream/Transformer.java  |  2 +-
 .../kafka/streams/kstream/ValueTransformer.java    |  2 +-
 .../streams/kstream/ValueTransformerWithKey.java   |  2 +-
 .../streams/kstream/internals/AbstractStream.java  |  2 +-
 .../kstream/internals/KStreamFlatTransform.java    |  2 +-
 .../internals/KStreamFlatTransformValues.java      |  4 +-
 .../internals/KStreamKTableJoinProcessor.java      |  5 +-
 .../kstream/internals/KStreamTransformValues.java  |  4 +-
 .../kstream/internals/KTableKTableInnerJoin.java   |  7 ++-
 .../kstream/internals/KTableKTableLeftJoin.java    |  7 ++-
 .../kstream/internals/KTableKTableOuterJoin.java   |  7 ++-
 .../kstream/internals/KTableKTableRightJoin.java   |  7 ++-
 .../streams/kstream/internals/KTableMapValues.java |  2 +-
 .../kstream/internals/KTableRepartitionMap.java    |  4 +-
 .../kstream/internals/KTableTransformValues.java   |  2 +-
 .../kstream/internals/KTableValueGetter.java       |  2 +-
 .../internals/SessionCacheFlushListener.java       | 10 ++--
 .../kstream/internals/SessionTupleForwarder.java   |  4 +-
 .../internals/TimestampedCacheFlushListener.java   | 10 ++--
 .../internals/TimestampedTupleForwarder.java       |  4 +-
 .../internals/TransformerSupplierAdapter.java      |  2 +-
 .../ForeignJoinSubscriptionProcessorSupplier.java  |  5 +-
 .../SubscriptionJoinForeignProcessorSupplier.java  |  5 +-
 .../SubscriptionStoreReceiveProcessorSupplier.java |  5 +-
 .../suppress/KTableSuppressProcessorSupplier.java  |  6 +--
 .../kafka/streams/processor/AbstractProcessor.java |  6 +--
 .../apache/kafka/streams/processor/Processor.java  |  2 +-
 .../kafka/streams/processor/ProcessorContext.java  | 16 +++---
 .../internals/AbstractProcessorContext.java        |  2 +-
 .../ForwardingDisabledProcessorContext.java        | 16 +++---
 .../internals/GlobalProcessorContextImpl.java      | 10 ++--
 .../processor/internals/GlobalStateUpdateTask.java |  4 +-
 .../processor/internals/GlobalStreamThread.java    | 12 ++---
 .../internals/InternalProcessorContext.java        |  2 +-
 .../processor/internals/ProcessorContextImpl.java  | 23 +++++---
 .../streams/processor/internals/ProcessorNode.java |  2 +-
 .../streams/processor/internals/SourceNode.java    |  2 +-
 .../processor/internals/StandbyContextImpl.java    | 16 +++---
 .../streams/processor/internals/StreamTask.java    |  8 +--
 .../kafka/streams/TopologyTestDriverWrapper.java   |  8 +--
 .../internals/KStreamFlatTransformTest.java        |  2 +-
 .../internals/KStreamFlatTransformValuesTest.java  |  2 +-
 .../kstream/internals/KStreamPrintTest.java        |  2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |  3 +-
 .../kstream/internals/KStreamTransformTest.java    |  4 +-
 .../kstream/internals/KTableFilterTest.java        |  5 +-
 .../kstream/internals/KTableMapValuesTest.java     |  5 +-
 .../kstream/internals/KTableSourceTest.java        |  3 +-
 .../internals/KTableTransformValuesTest.java       |  6 +--
 .../internals/SessionCacheFlushListenerTest.java   |  2 +-
 .../internals/SessionTupleForwarderTest.java       |  4 +-
 .../TimestampedCacheFlushListenerTest.java         |  4 +-
 .../internals/TimestampedTupleForwarderTest.java   |  4 +-
 .../internals/AbstractProcessorContextTest.java    |  2 +-
 .../ForwardingDisabledProcessorContextTest.java    | 61 ++++++++++++++++++++++
 .../internals/GlobalProcessorContextImplTest.java  |  4 +-
 .../internals/ProcessorContextImplTest.java        |  8 +--
 .../processor/internals/StreamThreadTest.java      |  2 +-
 .../streams/state/KeyValueStoreTestDriver.java     |  2 +-
 .../state/internals/AbstractKeyValueStoreTest.java |  2 +-
 .../state/internals/CachingKeyValueStoreTest.java  |  2 +-
 .../internals/InMemoryKeyValueLoggedStoreTest.java |  2 +-
 .../state/internals/InMemoryKeyValueStoreTest.java |  2 +-
 .../state/internals/InMemoryLRUCacheStoreTest.java |  2 +-
 .../state/internals/RocksDBKeyValueStoreTest.java  |  2 +-
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  2 +-
 .../kafka/test/InternalMockProcessorContext.java   |  2 +-
 .../kafka/test/MockInternalProcessorContext.java   | 10 ++--
 .../java/org/apache/kafka/test/MockProcessor.java  |  2 +-
 .../org/apache/kafka/test/MockProcessorNode.java   |  2 +-
 .../apache/kafka/test/NoOpProcessorContext.java    |  2 +-
 .../streams/scala/FunctionsCompatConversions.scala |  6 +--
 .../apache/kafka/streams/scala/TopologyTest.scala  |  4 +-
 .../kafka/streams/scala/kstream/KStreamTest.scala  |  8 +--
 .../apache/kafka/streams/TopologyTestDriver.java   |  4 +-
 .../streams/processor/MockProcessorContext.java    |  2 +-
 .../kafka/streams/TopologyTestDriverTest.java      |  8 +--
 78 files changed, 242 insertions(+), 192 deletions(-)

diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index f962f08..28fccfa 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -153,12 +153,12 @@ public class DeveloperGuideTesting {
     }
 
     public static class CustomMaxAggregator implements Processor<String, Long> {
-        ProcessorContext<Object, Object> context;
+        ProcessorContext context;
         private KeyValueStore<String, Long> store;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             this.context = context;
             context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
             context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 9015d38..af8e87e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -64,7 +64,7 @@ public interface Transformer<K, V, R> {
      *
      * @param context the context
      */
-    void init(final ProcessorContext<Object, Object> context);
+    void init(final ProcessorContext context);
 
     /**
      * Transform the record with the given key and value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index aae53be..987cae5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -69,7 +69,7 @@ public interface ValueTransformer<V, VR> {
      * @throws IllegalStateException If store gets registered after initialization is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(final ProcessorContext<Void, Void> context);
+    void init(final ProcessorContext context);
 
     /**
      * Transform the given value to a new value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 55d61dd..be37b0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -72,7 +72,7 @@ public interface ValueTransformerWithKey<K, V, VR> {
      * @throws IllegalStateException If store gets registered after initialization is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(final ProcessorContext<Void, Void> context);
+    void init(final ProcessorContext context);
 
     /**
      * Transform the given [key and ]value to a new value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 68a254a..3c6b591 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -111,7 +111,7 @@ public abstract class AbstractStream<K, V> {
             final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
             return new ValueTransformerWithKey<K, V, VR>() {
                 @Override
-                public void init(final ProcessorContext<Void, Void> context) {
+                public void init(final ProcessorContext context) {
                     valueTransformer.init(context);
                 }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
index 097db8a..10ef15f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
@@ -46,7 +46,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupp
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             transformer.init(context);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
index bb5a651..40e4b37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
@@ -39,14 +39,14 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupp
     public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
 
         private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
-        private ProcessorContext<Object, Object> context;
+        private ProcessorContext context;
 
         KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) {
             this.valueTransformer = valueTransformer;
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
             this.context = context;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index d93674b..d0ce634 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,11 +49,11 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
     }
 
     @Override
-    public void init(final ProcessorContext<Object, Object> context) {
+    public void init(final ProcessorContext context) {
         super.init(context);
         metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-        valueGetter.init(new ForwardingDisabledProcessorContext(context));
+        valueGetter.init(context);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index dbe3048f..843606b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -39,14 +39,14 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
     public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
 
         private final ValueTransformerWithKey<K, V, R> valueTransformer;
-        private ProcessorContext<Object, Object> context;
+        private ProcessorContext context;
 
         KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer) {
             this.valueTransformer = valueTransformer;
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
             this.context = context;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 7df2029..06701c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -76,11 +75,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            valueGetter.init(new ForwardingDisabledProcessorContext(context));
+            valueGetter.init(context);
         }
 
         @Override
@@ -136,7 +135,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(final ProcessorContext context) {
             valueGetter1.init(context);
             valueGetter2.init(context);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 420055c..b6fd894 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -75,11 +74,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            valueGetter.init(new ForwardingDisabledProcessorContext(context));
+            valueGetter.init(context);
         }
 
         @Override
@@ -142,7 +141,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(final ProcessorContext context) {
             valueGetter1.init(context);
             valueGetter2.init(context);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 3790107..391255d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -74,11 +73,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            valueGetter.init(new ForwardingDisabledProcessorContext(context));
+            valueGetter.init(context);
         }
 
         @Override
@@ -137,7 +136,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(final ProcessorContext context) {
             valueGetter1.init(context);
             valueGetter2.init(context);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 9c93dc7..f499c2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -73,11 +72,11 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            valueGetter.init(new ForwardingDisabledProcessorContext(context));
+            valueGetter.init(context);
         }
 
         @Override
@@ -133,7 +132,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(final ProcessorContext context) {
             valueGetter1.init(context);
             valueGetter2.init(context);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index b58c6ee..e734457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -135,7 +135,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index c1dedbd..a3d8b79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -104,14 +104,14 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
     private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
 
         private final KTableValueGetter<K, V> parentGetter;
-        private ProcessorContext<Void, Void> context;
+        private ProcessorContext context;
 
         KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(final ProcessorContext context) {
             this.context = context;
             parentGetter.init(context);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index decf96e..86da063 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -134,7 +134,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index 6479690..a2695d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 public interface KTableValueGetter<K, V> {
 
-    void init(ProcessorContext<Void, Void> context);
+    void init(ProcessorContext context);
 
     ValueAndTimestamp<V> get(K key);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index 3fe82c5..f40fdfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -24,11 +24,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
 class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext<Object, Object> context;
-    private final ProcessorNode<?, ?> myNode;
+    private final InternalProcessorContext context;
+    private final ProcessorNode myNode;
 
-    SessionCacheFlushListener(final ProcessorContext<Object, Object> context) {
-        this.context = (InternalProcessorContext<Object, Object>) context;
+    SessionCacheFlushListener(final ProcessorContext context) {
+        this.context = (InternalProcessorContext) context;
         myNode = this.context.currentNode();
     }
 
@@ -37,7 +37,7 @@ class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>,
                       final V newValue,
                       final V oldValue,
                       final long timestamp) {
-        final ProcessorNode<?, ?> prev = context.currentNode();
+        final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
             context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
index ee49979..bad255a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
@@ -32,13 +32,13 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  * @param <V>
  */
 class SessionTupleForwarder<K, V> {
-    private final ProcessorContext<Object, Object> context;
+    private final ProcessorContext context;
     private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
     @SuppressWarnings("unchecked")
     SessionTupleForwarder(final StateStore store,
-                          final ProcessorContext<Object, Object> context,
+                          final ProcessorContext context,
                           final CacheFlushListener<Windowed<K>, V> flushListener,
                           final boolean sendOldValues) {
         this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index ec9135e..5540376 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -26,11 +26,11 @@ import org.apache.kafka.streams.state.internals.CacheFlushListener;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
 class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
-    private final InternalProcessorContext<Object, Object> context;
-    private final ProcessorNode<?, ?> myNode;
+    private final InternalProcessorContext context;
+    private final ProcessorNode myNode;
 
-    TimestampedCacheFlushListener(final ProcessorContext<Object, Object> context) {
-        this.context = (InternalProcessorContext<Object, Object>) context;
+    TimestampedCacheFlushListener(final ProcessorContext context) {
+        this.context = (InternalProcessorContext) context;
         myNode = this.context.currentNode();
     }
 
@@ -39,7 +39,7 @@ class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, Value
                       final ValueAndTimestamp<V> newValue,
                       final ValueAndTimestamp<V> oldValue,
                       final long timestamp) {
-        final ProcessorNode<?, ?> prev = context.currentNode();
+        final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
             context.forward(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 979798e..910dd8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -30,13 +30,13 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  * @param <V> the type of the value
  */
 class TimestampedTupleForwarder<K, V> {
-    private final ProcessorContext<Object, Object> context;
+    private final ProcessorContext context;
     private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
     @SuppressWarnings("unchecked")
     TimestampedTupleForwarder(final StateStore store,
-                              final ProcessorContext<Object, Object> context,
+                              final ProcessorContext context,
                               final TimestampedCacheFlushListener<K, V> flushListener,
                               final boolean sendOldValues) {
         this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java
index 0d828d3..7d0bf7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java
@@ -38,7 +38,7 @@ public class TransformerSupplierAdapter<KIn, VIn, KOut, VOut> implements Transfo
             private Transformer<KIn, VIn, KeyValue<KOut, VOut>> transformer = transformerSupplier.get();
 
             @Override
-            public void init(final ProcessorContext<Object, Object> context) {
+            public void init(final ProcessorContext context) {
                 transformer.init(context);
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index 975c1df..fd95105 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -61,10 +61,9 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements Proc
         private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
-            final InternalProcessorContext<Object, Object> internalProcessorContext =
-                (InternalProcessorContext<Object, Object>) context;
+            final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
             droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
                 Thread.currentThread().getName(),
                 internalProcessorContext.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
index e6c5347..2544eb1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Objects;
@@ -57,10 +56,10 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
             private KTableValueGetter<KO, VO> foreignValues;
 
             @Override
-            public void init(final ProcessorContext<Object, Object> context) {
+            public void init(final ProcessorContext context) {
                 super.init(context);
                 foreignValues = foreignValueGetterSupplier.get();
-                foreignValues.init(new ForwardingDisabledProcessorContext(context));
+                foreignValues.init(context);
             }
 
             @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 344311b..61fb1c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -58,10 +58,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
             private Sensor droppedRecordsSensor;
 
             @Override
-            public void init(final ProcessorContext<Object, Object> context) {
+            public void init(final ProcessorContext context) {
                 super.init(context);
-                final InternalProcessorContext<Object, Object> internalProcessorContext =
-                    (InternalProcessorContext<Object, Object>) context;
+                final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
 
                 droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
                     Thread.currentThread().getName(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index 3ad4bc1..61f013c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -121,7 +121,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
         private final String storeName;
 
         private TimeOrderedKeyValueBuffer<K, V> buffer;
-        private InternalProcessorContext<Object, Object> internalProcessorContext;
+        private InternalProcessorContext internalProcessorContext;
         private Sensor suppressionEmitSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
@@ -138,8 +138,8 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
-            internalProcessorContext = (InternalProcessorContext<Object, Object>) context;
+        public void init(final ProcessorContext context) {
+            internalProcessorContext = (InternalProcessorContext) context;
             suppressionEmitSensor = ProcessorNodeMetrics.suppressionEmitSensor(
                 Thread.currentThread().getName(),
                 context.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index fa187a8..83abfca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -25,13 +25,13 @@ package org.apache.kafka.streams.processor;
  */
 public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
 
-    private ProcessorContext<Object, Object> context;
+    private ProcessorContext context;
 
     protected AbstractProcessor() {
     }
 
     @Override
-    public void init(final ProcessorContext<Object, Object> context) {
+    public void init(final ProcessorContext context) {
         this.context = context;
     }
 
@@ -51,7 +51,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
      *
      * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
      */
-    protected final ProcessorContext<Object, Object> context() {
+    protected final ProcessorContext context() {
         return context;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index bd0814d..4046f2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -37,7 +37,7 @@ public interface Processor<K, V> {
      * 
      * @param context the context; may not be null
      */
-    void init(ProcessorContext<Object, Object> context);
+    void init(ProcessorContext context);
 
     /**
      * Process the record with the given key and value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 50ab83c..1971c67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -27,11 +27,8 @@ import java.util.Map;
 
 /**
  * Processor context interface.
- *
- * @param <K> the type of input keys that can be forwarded
- * @param <V> the type of input values that can be forwarded
  */
-public interface ProcessorContext<K, V> {
+public interface ProcessorContext {
 
     /**
      * Returns the application id
@@ -161,12 +158,11 @@ public interface ProcessorContext<K, V> {
      * @param interval the time interval between punctuations (supported minimum is 1 millisecond)
      * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
      * @param callback a function consuming timestamps representing the current stream or system time
-     * @throws IllegalArgumentException if the interval is under 1 millisecond
      * @return a handle allowing cancellation of the punctuation schedule established by this method
      */
     Cancellable schedule(final Duration interval,
                          final PunctuationType type,
-                         final Punctuator callback);
+                         final Punctuator callback) throws IllegalArgumentException;
 
     /**
      * Forwards a key/value pair to all downstream processors.
@@ -175,7 +171,7 @@ public interface ProcessorContext<K, V> {
      * @param key key
      * @param value value
      */
-    <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value);
+    <K, V> void forward(final K key, final V value);
 
     /**
      * Forwards a key/value pair to the specified downstream processors.
@@ -185,7 +181,7 @@ public interface ProcessorContext<K, V> {
      * @param value value
      * @param to the options to use when forwarding
      */
-    <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to);
+    <K, V> void forward(final K key, final V value, final To to);
 
     /**
      * Forwards a key/value pair to one of the downstream processors designated by childIndex
@@ -196,7 +192,7 @@ public interface ProcessorContext<K, V> {
      */
     // TODO when we remove this method, we can also remove `ProcessorNode#children`
     @Deprecated
-    <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final int childIndex);
+    <K, V> void forward(final K key, final V value, final int childIndex);
 
     /**
      * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
@@ -206,7 +202,7 @@ public interface ProcessorContext<K, V> {
      * @deprecated please use {@link #forward(Object, Object, To)} instead
      */
     @Deprecated
-    <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
+    <K, V> void forward(final K key, final V value, final String childName);
 
     /**
      * Requests a commit
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 70831ed..71dd68a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -31,7 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 
 
-public abstract class AbstractProcessorContext<K, V> implements InternalProcessorContext<K, V> {
+public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
     public static final String NONEXIST_TOPIC = "__null_topic__";
     private final TaskId taskId;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 4b2aa5f..ba39368 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -37,10 +37,10 @@ import java.util.Objects;
 /**
  * {@code ProcessorContext} implementation that will throw on any forward call.
  */
-public final class ForwardingDisabledProcessorContext implements ProcessorContext<Void, Void> {
-    private final ProcessorContext<?, ?> delegate;
+public final class ForwardingDisabledProcessorContext implements ProcessorContext {
+    private final ProcessorContext delegate;
 
-    public ForwardingDisabledProcessorContext(final ProcessorContext<?, ?> delegate) {
+    public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
         this.delegate = Objects.requireNonNull(delegate, "delegate");
     }
 
@@ -96,29 +96,29 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
     @Override
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
-                                final Punctuator callback) {
+                                final Punctuator callback) throws IllegalArgumentException {
         return delegate.schedule(interval, type, callback);
     }
 
     @Override
-    public void forward(final Void key, final Void value) {
+    public <K, V> void forward(final K key, final V value) {
         throw new StreamsException("ProcessorContext#forward() not supported.");
     }
 
     @Override
-    public void forward(final Void key, final Void value, final To to) {
+    public <K, V> void forward(final K key, final V value, final To to) {
         throw new StreamsException("ProcessorContext#forward() not supported.");
     }
 
     @Override
     @Deprecated
-    public void forward(final Void key, final Void value, final int childIndex) {
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new StreamsException("ProcessorContext#forward() not supported.");
     }
 
     @Override
     @Deprecated
-    public void forward(final Void key, final Void value, final String childName) {
+    public <K, V> void forward(final K key, final V value, final String childName) {
         throw new StreamsException("ProcessorContext#forward() not supported.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index ff5c26a..859430c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -38,7 +38,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
 
-public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> {
+public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
 
     public GlobalProcessorContextImpl(final StreamsConfig config,
@@ -69,7 +69,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value) {
+    public <K, V> void forward(final K key, final V value) {
         final ProcessorNode<?, ?> previousNode = currentNode();
         try {
             for (final ProcessorNode<?, ?> child :  currentNode().children()) {
@@ -85,7 +85,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
      * No-op. This should only be called on GlobalStateStore#flush and there should be no child nodes
      */
     @Override
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to) {
+    public <K, V> void forward(final K key, final V value, final To to) {
         if (!currentNode().children().isEmpty()) {
             throw new IllegalStateException("This method should only be called on 'GlobalStateStore.flush' that should not have any children.");
         }
@@ -96,7 +96,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
      */
     @Override
     @Deprecated
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final int childIndex) {
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
@@ -105,7 +105,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
      */
     @Override
     @Deprecated
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName) {
+    public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index dcb3335..ddef7a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -35,7 +35,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
 public class GlobalStateUpdateTask implements GlobalStateMaintainer {
 
     private final ProcessorTopology topology;
-    private final InternalProcessorContext<Object, Object> processorContext;
+    private final InternalProcessorContext processorContext;
     private final Map<TopicPartition, Long> offsets = new HashMap<>();
     private final Map<String, RecordDeserializer> deserializers = new HashMap<>();
     private final GlobalStateManager stateMgr;
@@ -43,7 +43,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     private final LogContext logContext;
 
     public GlobalStateUpdateTask(final ProcessorTopology topology,
-                                 final InternalProcessorContext<Object, Object> processorContext,
+                                 final InternalProcessorContext processorContext,
                                  final GlobalStateManager stateMgr,
                                  final DeserializationExceptionHandler deserializationExceptionHandler,
                                  final LogContext logContext) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index c681825..923480f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -320,13 +320,11 @@ public class GlobalStreamThread extends Thread {
                 stateRestoreListener,
                 config);
 
-            final GlobalProcessorContextImpl<Object, Object> globalProcessorContext =
-                new GlobalProcessorContextImpl<>(
-                    config,
-                    stateMgr,
-                    streamsMetrics,
-                    cache
-                );
+            final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
+                config,
+                stateMgr,
+                streamsMetrics,
+                cache);
             stateMgr.setGlobalProcessorContext(globalProcessorContext);
 
             final StateConsumer stateConsumer = new StateConsumer(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index d25ce30..3e00ab2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
  * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext<K, V> extends ProcessorContext<K, V> {
+public interface InternalProcessorContext extends ProcessorContext {
 
     @Override
     StreamsMetricsImpl metrics();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 0e1b07a..d390af5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -45,7 +45,7 @@ import java.util.List;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
-public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> implements RecordCollector.Supplier {
+public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     private final StreamTask task;
     private final RecordCollector collector;
@@ -126,13 +126,16 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
     }
 
     @Override
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value) {
+    public <K, V> void forward(final K key,
+                               final V value) {
         forward(key, value, SEND_TO_ALL);
     }
 
     @Override
     @Deprecated
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final int childIndex) {
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final int childIndex) {
         forward(
             key,
             value,
@@ -141,13 +144,17 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
 
     @Override
     @Deprecated
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName) {
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final String childName) {
         forward(key, value, To.child(childName));
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to) {
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final To to) {
         final ProcessorNode<?, ?> previousNode = currentNode();
         final ProcessorRecordContext previousContext = recordContext;
 
@@ -182,7 +189,9 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
         }
     }
 
-    private <K1 extends K, V1 extends V> void forward(final ProcessorNode<K1, V1> child, final K1 key, final V1 value) {
+    private <K, V> void forward(final ProcessorNode<K, V> child,
+                                final K key,
+                                final V value) {
         setCurrentNode(child);
         child.process(key, value);
     }
@@ -207,7 +216,7 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
     @Override
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
-                                final Punctuator callback) {
+                                final Punctuator callback) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
         return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8b72ee3..f37ff2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -87,7 +87,7 @@ public class ProcessorNode<K, V> {
         childByName.put(child.name, child);
     }
 
-    public void init(final InternalProcessorContext<Object, Object> context) {
+    public void init(final InternalProcessorContext context) {
         try {
             internalProcessorContext = context;
             initSensors();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index c43afab..853520a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -30,7 +30,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
     private final List<String> topics;
 
-    private ProcessorContext<Object, Object> context;
+    private ProcessorContext context;
     private Deserializer<K> keyDeserializer;
     private Deserializer<V> valDeserializer;
     private final TimestampExtractor timestampExtractor;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index f866521..9a94ad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
 
-class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements RecordCollector.Supplier {
+class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
@@ -104,7 +104,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public void forward(final Void key, final Void value) {
+    public <K, V> void forward(final K key, final V value) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
@@ -112,7 +112,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public void forward(final Void key, final Void value, final To to) {
+    public <K, V> void forward(final K key, final V value, final To to) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
@@ -121,7 +121,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
      */
     @Override
     @Deprecated
-    public void forward(final Void key, final Void value, final int childIndex) {
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
@@ -130,7 +130,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
      */
     @Override
     @Deprecated
-    public void forward(final Void key, final Void value, final String childName) {
+    public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
@@ -155,7 +155,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
+    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }
 
@@ -176,7 +176,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
     }
 
     @Override
-    public void setCurrentNode(final ProcessorNode<?, ?> currentNode) {
+    public void setCurrentNode(final ProcessorNode currentNode) {
         // no-op. can't throw as this is called on commit when the StateStores get flushed.
     }
 
@@ -184,7 +184,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public ProcessorNode<?, ?> currentNode() {
+    public ProcessorNode currentNode() {
         throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 51604e7..1d513a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -97,7 +97,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private final Sensor punctuateLatencySensor;
     private final Sensor bufferedRecordsSensor;
     private final Sensor enforcedProcessingSensor;
-    private final InternalProcessorContext<Object, Object> processorContext;
+    private final InternalProcessorContext processorContext;
 
     private long idleStartTimeMs;
     private boolean commitNeeded = false;
@@ -153,7 +153,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
         // initialize the topology with its own context
-        processorContext = new ProcessorContextImpl<>(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache);
+        processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache);
 
         final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
         final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
@@ -936,7 +936,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    public ProcessorContext<Object, Object> context() {
+    public ProcessorContext context() {
         return processorContext;
     }
 
@@ -1007,7 +1007,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return recordCollector;
     }
 
-    InternalProcessorContext<Object, Object> processorContext() {
+    InternalProcessorContext processorContext() {
         return processorContext;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
index 993a2ce..5c73bb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 import java.util.Properties;
@@ -42,9 +42,9 @@ public class TopologyTestDriverWrapper extends TopologyTestDriver {
      * @param processorName processor name to set as current node
      * @return the processor context
      */
-    public ProcessorContext<Object, Object> setCurrentNodeForProcessorContext(final String processorName) {
-        final ProcessorContext<Object, Object> context = task.context();
-        ((InternalProcessorContext<Object, Object>) context).setCurrentNode(getProcessor(processorName));
+    public ProcessorContext setCurrentNodeForProcessorContext(final String processorName) {
+        final ProcessorContext context = task.context();
+        ((ProcessorContextImpl) context).setCurrentNode(getProcessor(processorName));
         return context;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index 54baf2c..d18a7a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -38,7 +38,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
     private Number inputValue;
 
     private Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
-    private ProcessorContext<Object, Object> context;
+    private ProcessorContext context;
 
     private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
index 2b8d44f..36167c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -38,7 +38,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
     private Integer inputValue;
 
     private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
-    private ProcessorContext<Object, Object> context;
+    private ProcessorContext context;
 
     private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 07f43be..9906556 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -45,7 +45,7 @@ public class KStreamPrintTest {
             "test-stream"));
 
         printProcessor = kStreamPrint.get();
-        final ProcessorContext<Object, Object> processorContext = EasyMock.createNiceMock(ProcessorContext.class);
+        final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
         EasyMock.replay(processorContext);
 
         printProcessor.init(processorContext);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 9bdae4e..d3fe80a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.To;
@@ -299,7 +298,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     @Test
     public void shouldGetAggregatedValuesFromValueGetter() {
         final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get();
-        getter.init(new ForwardingDisabledProcessorContext(context));
+        getter.init(context);
         context.setTime(0);
         processor.process("a", "1");
         context.setTime(GAP_MS + 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 521ad5e..1f37b3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -56,7 +56,7 @@ public class KStreamTransformTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext<Object, Object> context) {
+                public void init(final ProcessorContext context) {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
@@ -122,7 +122,7 @@ public class KStreamTransformTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext<Object, Object> context) {
+                public void init(final ProcessorContext context) {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index eaf6f77..f4c1854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.TestInputTopic;
@@ -151,8 +150,8 @@ public class KTableFilterTest {
             final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
             final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
 
-            getter2.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table2.name)));
-            getter3.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table3.name)));
+            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
 
             inputTopic.pipeInput("A", 1, 5L);
             inputTopic.pipeInput("B", 1, 10L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 346fded..28b0342 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -122,8 +121,8 @@ public class KTableMapValuesTest {
             final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
             final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
 
-            getter2.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table2.name)));
-            getter3.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table3.name)));
+            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
 
             inputTopic1.pipeInput("A", "01", 50L);
             inputTopic1.pipeInput("B", "01", 10L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index fc6ac7a..236ea58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -189,7 +188,7 @@ public class KTableSourceTest {
                     Duration.ZERO
                 );
             final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            getter1.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table1.name)));
+            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
 
             inputTopic1.pipeInput("A", "01", 10L);
             inputTopic1.pipeInput("B", "01", 20L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index c850e7f..80cdae4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -84,7 +84,7 @@ public class KTableTransformValuesTest {
     @Mock(MockType.NICE)
     private KTableImpl<String, String, String> parent;
     @Mock(MockType.NICE)
-    private InternalProcessorContext<Object, Object> context;
+    private InternalProcessorContext context;
     @Mock(MockType.NICE)
     private KTableValueGetterSupplier<String, String> parentGetterSupplier;
     @Mock(MockType.NICE)
@@ -207,7 +207,7 @@ public class KTableTransformValuesTest {
         replay(parent, parentGetterSupplier, parentGetter);
 
         final KTableValueGetter<String, String> getter = transformValues.view().get();
-        getter.init(new ForwardingDisabledProcessorContext(context));
+        getter.init(context);
 
         final String result = getter.get("Key").value();
 
@@ -224,7 +224,7 @@ public class KTableTransformValuesTest {
         replay(context, stateStore);
 
         final KTableValueGetter<String, String> getter = transformValues.view().get();
-        getter.init(new ForwardingDisabledProcessorContext(context));
+        getter.init(context);
 
         final String result = getter.get("Key").value();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index 1a1f413..b25febf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
 public class SessionCacheFlushListenerTest {
     @Test
     public void shouldForwardKeyNewValueOldValueAndTimestamp() {
-        final InternalProcessorContext<Object, Object> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
index b2a4a54..e99c684 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
@@ -57,7 +57,7 @@ public class SessionTupleForwarderTest {
 
     private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+        final ProcessorContext context = mock(ProcessorContext.class);
 
         expect(store.setFlushListener(null, sendOldValued)).andReturn(false);
         if (sendOldValued) {
@@ -83,7 +83,7 @@ public class SessionTupleForwarderTest {
     @Test
     public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+        final ProcessorContext context = mock(ProcessorContext.class);
 
         expect(store.setFlushListener(null, false)).andReturn(true);
         replay(store, context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 90123cc..38ef5c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -31,7 +31,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardValueTimestampIfNewValueExists() {
-        final InternalProcessorContext<Object, Object> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -53,7 +53,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardParameterTimestampIfNewValueIsNull() {
-        final InternalProcessorContext<Object, Object> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index e8f9e07..52a5fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -57,7 +57,7 @@ public class TimestampedTupleForwarderTest {
 
     private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+        final ProcessorContext context = mock(ProcessorContext.class);
 
         expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
         if (sendOldValues) {
@@ -81,7 +81,7 @@ public class TimestampedTupleForwarderTest {
     @Test
     public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+        final ProcessorContext context = mock(ProcessorContext.class);
 
         expect(store.setFlushListener(null, false)).andReturn(true);
         replay(store, context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 8002d09..4a3ee7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -184,7 +184,7 @@ public class AbstractProcessorContextTest {
     }
 
 
-    private static class TestProcessorContext extends AbstractProcessorContext<Object, Object> {
+    private static class TestProcessorContext extends AbstractProcessorContext {
         static Properties config;
         static {
             config = getStreamsConfig();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
new file mode 100644
index 0000000..c6b2cbe
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class ForwardingDisabledProcessorContextTest {
+    @Mock(MockType.NICE)
+    private ProcessorContext delegate;
+    private ForwardingDisabledProcessorContext context;
+
+    @Before
+    public void setUp() {
+        context = new ForwardingDisabledProcessorContext(delegate);
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForward() {
+        context.forward("key", "value");
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForwardWithTo() {
+        context.forward("key", "value", To.all());
+    }
+
+    @SuppressWarnings("deprecation") // need to test deprecated code until removed
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForwardWithChildIndex() {
+        context.forward("key", "value", 1);
+    }
+
+    @SuppressWarnings("deprecation") // need to test deprecated code until removed
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForwardWithChildName() {
+        context.forward("key", "value", "child1");
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 60a8a5e..8443e55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -51,7 +51,7 @@ public class GlobalProcessorContextImplTest {
     private static final String UNKNOWN_STORE = "unknown-store";
     private static final String CHILD_PROCESSOR = "child";
 
-    private GlobalProcessorContextImpl<Object, Object> globalContext;
+    private GlobalProcessorContextImpl globalContext;
 
     private ProcessorNode<?, ?> child;
     private ProcessorRecordContext recordContext;
@@ -74,7 +74,7 @@ public class GlobalProcessorContextImplTest {
         expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
         replay(stateManager);
 
-        globalContext = new GlobalProcessorContextImpl<>(
+        globalContext = new GlobalProcessorContextImpl(
             streamsConfig,
             stateManager,
             null,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 9255053..5b52e9d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -55,7 +55,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ProcessorContextImplTest {
-    private ProcessorContextImpl<Object, Object> context;
+    private ProcessorContextImpl context;
 
     private static final String KEY = "key";
     private static final long VALUE = 42L;
@@ -121,7 +121,7 @@ public class ProcessorContextImplTest {
 
         replay(stateManager);
 
-        context = new ProcessorContextImpl<>(
+        context = new ProcessorContextImpl(
             mock(TaskId.class),
             mock(StreamTask.class),
             streamsConfig,
@@ -524,10 +524,10 @@ public class ProcessorContextImplTest {
     }
 
     private <T extends StateStore> void doTest(final String name, final Consumer<T> checker) {
-        final Processor<String, Long> processor = new Processor<String, Long>() {
+        final Processor processor = new Processor<String, Long>() {
             @Override
             @SuppressWarnings("unchecked")
-            public void init(final ProcessorContext<Object, Object> context) {
+            public void init(final ProcessorContext context) {
                 final T store = (T) context.getStateStore(name);
                 checker.accept(store);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index ff632eb..2fdba0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1481,7 +1481,7 @@ public class StreamThreadTest {
         final List<Long> punctuatedWallClockTime = new ArrayList<>();
         final ProcessorSupplier<Object, Object> punctuateProcessor = () -> new Processor<Object, Object>() {
             @Override
-            public void init(final ProcessorContext<Object, Object> context) {
+            public void init(final ProcessorContext context) {
                 context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
                 context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index fce8ce1..89740c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -331,7 +331,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @return the processing context; never null
      * @see #addEntryToRestoreLog(Object, Object)
      */
-    public ProcessorContext<Object, Object> context() {
+    public ProcessorContext context() {
         return context;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 3927d28..ca5fd2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -50,7 +50,7 @@ import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
 
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context);
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
 
     protected InternalMockProcessorContext context;
     protected KeyValueStore<Integer, String> store;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 92f3e99..ffc8134 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -87,7 +87,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.persistentKeyValueStore("cache-store"),
                 (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 330d53c..7c0d16c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -28,7 +28,7 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
             Stores.inMemoryKeyValueStore("my-store"),
             (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 2af4130..62f8949 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -31,7 +31,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.inMemoryKeyValueStore("my-store"),
                 (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 9aa100a..2a86cdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -36,7 +36,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.lruMap("my-store", 10),
                 (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5e79051..504aa9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -38,7 +38,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
                 Stores.persistentKeyValueStore("my-store"),
                 (Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 441448c..990bfdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -48,7 +48,7 @@ public class SmokeTestUtil {
                     private long largestOffset = Long.MIN_VALUE;
 
                     @Override
-                    public void init(final ProcessorContext<Object, Object> context) {
+                    public void init(final ProcessorContext context) {
                         super.init(context);
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         System.out.flush();
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index cde5735..c5aedb5 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -54,7 +54,7 @@ import java.util.Map;
 import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
 
 public class InternalMockProcessorContext
-    extends AbstractProcessorContext<Object, Object>
+    extends AbstractProcessorContext
     implements RecordCollector.Supplier {
 
     private final File stateDir;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 6817504..38b4160 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -32,12 +32,10 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
 
     private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
-    private ProcessorNode<?, ?> currentNode;
+    private ProcessorNode currentNode;
     private RecordCollector recordCollector;
 
     public MockInternalProcessorContext() {
@@ -69,12 +67,12 @@ public class MockInternalProcessorContext
     }
 
     @Override
-    public void setCurrentNode(final ProcessorNode<?, ?> currentNode) {
+    public void setCurrentNode(final ProcessorNode currentNode) {
         this.currentNode = currentNode;
     }
 
     @Override
-    public ProcessorNode<?, ?> currentNode() {
+    public ProcessorNode currentNode() {
         return currentNode;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 70b93ea..8e3ce7d 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -56,7 +56,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     }
 
     @Override
-    public void init(final ProcessorContext<Object, Object> context) {
+    public void init(final ProcessorContext context) {
         super.init(context);
         if (scheduleInterval > 0L) {
             scheduleCancellable = context.schedule(
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 60f6045..718e5af 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -52,7 +52,7 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     }
 
     @Override
-    public void init(final InternalProcessorContext<Object, Object> context) {
+    public void init(final InternalProcessorContext context) {
         super.init(context);
         initialized = true;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index d494f10..77dd418 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -33,7 +33,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-public class NoOpProcessorContext extends AbstractProcessorContext<Object, Object> {
+public class NoOpProcessorContext extends AbstractProcessorContext {
     public boolean initialized;
     @SuppressWarnings("WeakerAccess")
     public Map<Object, Object> forwardedValues = new HashMap<>();
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
index fdce551..c3c6403 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
@@ -97,7 +97,7 @@ private[scala] object FunctionsCompatConversions {
       val innerTransformer = supplier.get()
       new Transformer[K, V, JIterable[VO]] {
         override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava
-        override def init(context: ProcessorContext[Object, Object]): Unit = innerTransformer.init(context)
+        override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
         override def close(): Unit = innerTransformer.close()
       }
     }
@@ -108,7 +108,7 @@ private[scala] object FunctionsCompatConversions {
       val innerTransformer = supplier.get()
       new ValueTransformer[V, JIterable[VO]] {
         override def transform(value: V): JIterable[VO] = innerTransformer.transform(value).asJava
-        override def init(context: ProcessorContext[Void, Void]): Unit = innerTransformer.init(context)
+        override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
         override def close(): Unit = innerTransformer.close()
       }
     }
@@ -120,7 +120,7 @@ private[scala] object FunctionsCompatConversions {
       val innerTransformer = supplier.get()
       new ValueTransformerWithKey[K, V, JIterable[VO]] {
         override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava
-        override def init(context: ProcessorContext[Void, Void]): Unit = innerTransformer.init(context)
+        override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
         override def close(): Unit = innerTransformer.close()
       }
     }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 616ee5d..3107db6 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -291,7 +291,7 @@ class TopologyTest {
         .transform(
           () =>
             new Transformer[String, String, KeyValue[String, String]] {
-              override def init(context: ProcessorContext[Object, Object]): Unit = ()
+              override def init(context: ProcessorContext): Unit = ()
               override def transform(key: String, value: String): KeyValue[String, String] =
                 new KeyValue(key, value.toLowerCase)
               override def close(): Unit = ()
@@ -312,7 +312,7 @@ class TopologyTest {
       val lowered: KStreamJ[String, String] = textLines.transform(
         () =>
           new Transformer[String, String, KeyValue[String, String]] {
-            override def init(context: ProcessorContext[Object, Object]): Unit = ()
+            override def init(context: ProcessorContext): Unit = ()
             override def transform(key: String, value: String): KeyValue[String, String] =
               new KeyValue(key, value.toLowerCase)
             override def close(): Unit = ()
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 66bf313..e5a0aad 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -185,7 +185,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
   "transform a KStream" should "transform correctly records" in {
     class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
-      override def init(context: ProcessorContext[Object, Object]): Unit = {}
+      override def init(context: ProcessorContext): Unit = {}
       override def transform(key: String, value: String): KeyValue[String, String] =
         new KeyValue(s"$key-transformed", s"$value-transformed")
       override def close(): Unit = {}
@@ -217,7 +217,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
   "flatTransform a KStream" should "flatTransform correctly records" in {
     class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] {
-      override def init(context: ProcessorContext[Object, Object]): Unit = {}
+      override def init(context: ProcessorContext): Unit = {}
       override def transform(key: String, value: String): Iterable[KeyValue[String, String]] =
         Array(new KeyValue(s"$key-transformed", s"$value-transformed"))
       override def close(): Unit = {}
@@ -249,7 +249,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
   "flatTransformValues a KStream" should "correctly flatTransform values in records" in {
     class TestTransformer extends ValueTransformer[String, Iterable[String]] {
-      override def init(context: ProcessorContext[Void, Void]): Unit = {}
+      override def init(context: ProcessorContext): Unit = {}
       override def transform(value: String): Iterable[String] =
         Array(s"$value-transformed")
       override def close(): Unit = {}
@@ -282,7 +282,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
   "flatTransformValues with key in a KStream" should "correctly flatTransformValues in records" in {
     class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
-      override def init(context: ProcessorContext[Void, Void]): Unit = {}
+      override def init(context: ProcessorContext): Unit = {}
       override def transform(key: String, value: String): Iterable[String] =
         Array(s"$value-transformed-$key")
       override def close(): Unit = {}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 0a5a560..bfcfb87 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -425,8 +425,8 @@ public class TopologyTestDriver implements Closeable {
                 streamsConfig
             );
 
-            final GlobalProcessorContextImpl<Object, Object> globalProcessorContext =
-                new GlobalProcessorContextImpl<>(streamsConfig, globalStateManager, streamsMetrics, cache);
+            final GlobalProcessorContextImpl globalProcessorContext =
+                new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);
             globalStateManager.setGlobalProcessorContext(globalProcessorContext);
 
             globalStateTask = new GlobalStateUpdateTask(
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index fbca146..73da6ef 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -55,7 +55,7 @@ import java.util.Properties;
  * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
  * {@link Topology} and using the {@link TopologyTestDriver}.
  */
-public class MockProcessorContext implements ProcessorContext<Object, Object>, RecordCollector.Supplier {
+public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
     // Immutable fields ================================================
     private final StreamsMetricsImpl metrics;
     private final TaskId taskId;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index ee6853f..fb5585f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -236,7 +236,7 @@ public class TopologyTestDriverTest {
 
     private final static class MockProcessor implements Processor<Object, Object> {
         private final Collection<Punctuation> punctuations;
-        private ProcessorContext<Object, Object> context;
+        private ProcessorContext context;
 
         private boolean initialized = false;
         private boolean closed = false;
@@ -247,7 +247,7 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             initialized = true;
             this.context = context;
             for (final Punctuation punctuation : punctuations) {
@@ -1284,12 +1284,12 @@ public class TopologyTestDriverTest {
     }
 
     private static class CustomMaxAggregator implements Processor<String, Long> {
-        ProcessorContext<Object, Object> context;
+        ProcessorContext context;
         private KeyValueStore<String, Long> store;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext<Object, Object> context) {
+        public void init(final ProcessorContext context) {
             this.context = context;
             context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushStore());
             context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, timestamp -> flushStore());