You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/08 20:31:48 UTC

[kafka] branch trunk updated: MINOR: cleanup deprectaion annotations (#6290)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3619d2f  MINOR: cleanup deprectaion annotations (#6290)
3619d2f is described below

commit 3619d2f383f65108dfd33686119f675aaeea54b7
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 8 12:31:34 2019 -0800

    MINOR: cleanup deprectaion annotations (#6290)
    
    If deprecated interface methods are inherited, the @Deprication tag should be used (instead on suppressing the deprecation warning).
    
    Reviewers:  Guozhang Wang <wa...@gmail.com>,  John Roesler <jo...@confluent.io>,  Bill Bejeck <bb...@gmail.com>
---
 .../apache/kafka/streams/kstream/JoinWindows.java  | 14 +----
 .../kafka/streams/kstream/SessionWindows.java      |  2 -
 .../apache/kafka/streams/kstream/TimeWindows.java  | 14 ++---
 .../kafka/streams/kstream/UnlimitedWindows.java    |  9 +--
 .../org/apache/kafka/streams/kstream/Windows.java  |  3 +-
 .../streams/kstream/internals/SessionWindow.java   |  2 -
 .../internals/AbstractProcessorContext.java        |  4 --
 .../ForwardingDisabledProcessorContext.java        |  6 +-
 .../internals/GlobalProcessorContextImpl.java      |  4 +-
 .../internals/InternalProcessorContext.java        |  3 +
 .../processor/internals/ProcessorContextImpl.java  | 36 ++++++------
 .../processor/internals/StandbyContextImpl.java    |  8 +--
 .../org/apache/kafka/streams/state/Stores.java     | 14 ++---
 .../apache/kafka/streams/state/WindowStore.java    | 15 +++--
 .../state/internals/CachingWindowStore.java        |  6 +-
 .../internals/ChangeLoggingWindowBytesStore.java   |  6 +-
 .../internals/CompositeReadOnlyWindowStore.java    | 66 ++++++++++++++--------
 .../state/internals/MeteredWindowStore.java        |  6 +-
 .../state/internals/RocksDBWindowStore.java        |  6 +-
 .../internals/AbstractProcessorContextTest.java    | 20 ++++---
 .../ForwardingDisabledProcessorContextTest.java    |  2 +
 .../internals/GlobalProcessorContextImplTest.java  |  2 +
 .../processor/internals/ProcessorTopologyTest.java |  4 +-
 .../internals/RecordDeserializerTest.java          |  3 +-
 .../kafka/test/InternalMockProcessorContext.java   | 14 ++---
 .../kafka/test/MockInternalProcessorContext.java   |  8 +--
 .../apache/kafka/test/NoOpProcessorContext.java    | 22 ++++----
 .../streams/processor/MockProcessorContext.java    | 16 ++++--
 28 files changed, 165 insertions(+), 150 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 219489f..6331877 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -92,7 +92,7 @@ public final class JoinWindows extends Windows<Window> {
         this.maintainDurationMs = maintainDurationMs;
     }
 
-    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
+    @Deprecated // removing segments from Windows will fix this
     private JoinWindows(final long beforeMs,
                         final long afterMs,
                         final long graceMs,
@@ -131,7 +131,6 @@ public final class JoinWindows extends Windows<Window> {
      * @param timeDifference join window interval
      * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings("deprecation")
     public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         return of(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
@@ -148,7 +147,6 @@ public final class JoinWindows extends Windows<Window> {
      * @throws IllegalArgumentException if the resulting window size is negative
      * @deprecated Use {@link #before(Duration)} instead.
      */
-    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Deprecated
     public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
         return new JoinWindows(timeDifferenceMs, afterMs, graceMs, maintainDurationMs, segments);
@@ -164,7 +162,6 @@ public final class JoinWindows extends Windows<Window> {
      * @param timeDifference relative window start time
      * @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         return before(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
@@ -181,7 +178,6 @@ public final class JoinWindows extends Windows<Window> {
      * @throws IllegalArgumentException if the resulting window size is negative
      * @deprecated Use {@link #after(Duration)} instead
      */
-    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Deprecated
     public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
         return new JoinWindows(beforeMs, timeDifferenceMs, graceMs, maintainDurationMs, segments);
@@ -197,7 +193,6 @@ public final class JoinWindows extends Windows<Window> {
      * @param timeDifference relative window end time
      * @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         return after(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
@@ -239,7 +234,6 @@ public final class JoinWindows extends Windows<Window> {
         return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, maintainDurationMs, segments);
     }
 
-    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     @Override
     public long gracePeriodMs() {
         // NOTE: in the future, when we remove maintainMs,
@@ -254,7 +248,6 @@ public final class JoinWindows extends Windows<Window> {
      * @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
      * @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} instead.
      */
-    @SuppressWarnings("deprecation")
     @Override
     @Deprecated
     public JoinWindows until(final long durationMs) throws IllegalArgumentException {
@@ -272,14 +265,13 @@ public final class JoinWindows extends Windows<Window> {
      * @return the window maintain duration
      * @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}.
      */
-    @SuppressWarnings({"deprecation", "deprecatedMemberStillInUse"})
     @Override
     @Deprecated
     public long maintainMs() {
         return Math.max(maintainDurationMs, size());
     }
 
-    @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -296,7 +288,7 @@ public final class JoinWindows extends Windows<Window> {
             graceMs == that.graceMs;
     }
 
-    @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public int hashCode() {
         return Objects.hash(beforeMs, afterMs, graceMs, maintainDurationMs, segments);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 9c77fa5..c0153a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -108,7 +108,6 @@ public final class SessionWindows {
      *
      * @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings("deprecation")
     public static SessionWindows with(final Duration inactivityGap) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
         return with(ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix));
@@ -163,7 +162,6 @@ public final class SessionWindows {
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     public long gracePeriodMs() {
-
         // NOTE: in the future, when we remove maintainMs,
         // we should default the grace period to 24h to maintain the default behavior,
         // or we can default to (24h - gapMs) if you want to be super accurate.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 03203f0..a87dbf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -79,7 +79,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
     }
 
     /** Private constructor for preserving segments. Can be removed along with Windows.segments. **/
-    @SuppressWarnings("DeprecatedIsStillUsed")
     @Deprecated
     private TimeWindows(final long sizeMs,
                         final long advanceMs,
@@ -127,7 +126,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @return a new window definition with default maintain duration of 1 day
      * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // removing #of(final long sizeMs) will fix this
     public static TimeWindows of(final Duration size) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
         return of(ApiUtils.validateMillisecondDuration(size, msgPrefix));
@@ -145,7 +144,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
      * @deprecated Use {@link #advanceBy(Duration)} instead
      */
-    @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
     @Deprecated
     public TimeWindows advanceBy(final long advanceMs) {
         if (advanceMs <= 0 || advanceMs > sizeMs) {
@@ -166,7 +164,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @return a new window definition with default maintain duration of 1 day
      * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
      */
-    @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
+    @SuppressWarnings("deprecation") // removing #advanceBy(final long advanceMs) will fix this
     public TimeWindows advanceBy(final Duration advance) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
         return advanceBy(ApiUtils.validateMillisecondDuration(advance, msgPrefix));
@@ -227,7 +225,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @deprecated since 2.1. Use {@link Materialized#retention} or directly configure the retention in a store supplier
      *             and use {@link Materialized#as(WindowBytesStoreSupplier)}.
      */
-    @SuppressWarnings("deprecation")
     @Override
     @Deprecated
     public TimeWindows until(final long durationMs) throws IllegalArgumentException {
@@ -245,14 +242,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @return the window maintain duration
      * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
-    @SuppressWarnings({"DeprecatedIsStillUsed", "deprecation"})
     @Override
     @Deprecated
     public long maintainMs() {
         return Math.max(maintainDurationMs, sizeMs);
     }
 
-    @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -269,13 +265,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
             graceMs == that.graceMs;
     }
 
-    @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public int hashCode() {
         return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, graceMs);
     }
 
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public String toString() {
         return "TimeWindows{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index e1894ba..f8ec6ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -84,7 +84,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
      * @return a new unlimited window that starts at {@code start}
      * @throws IllegalArgumentException if the start time is negative or can't be represented as {@code long milliseconds}
      */
-    @SuppressWarnings("deprecation")
     public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start");
         return startOn(ApiUtils.validateMillisecondInstant(start, msgPrefix));
@@ -120,7 +119,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
      * @throws IllegalArgumentException on every invocation.
      * @deprecated since 2.1.
      */
-    @SuppressWarnings("deprecation")
     @Override
     @Deprecated
     public UnlimitedWindows until(final long durationMs) {
@@ -134,7 +132,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
      * @return the window retention time that is {@link Long#MAX_VALUE}
      * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
-    @SuppressWarnings("deprecation")
     @Override
     @Deprecated
     public long maintainMs() {
@@ -146,7 +143,7 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
         return 0L;
     }
 
-    @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -159,13 +156,13 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
         return startMs == that.startMs && segments == that.segments;
     }
 
-    @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public int hashCode() {
         return Objects.hash(startMs, segments);
     }
 
-    @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+    @SuppressWarnings("deprecation") // removing segments from Windows will fix this
     @Override
     public String toString() {
         return "UnlimitedWindows{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index feaee1e..e122b4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -46,7 +46,7 @@ public abstract class Windows<W extends Window> {
 
     protected Windows() {}
 
-    @SuppressWarnings("deprecation") // remove this constructor when we remove segments.
+    @Deprecated // remove this constructor when we remove segments.
     Windows(final int segments) {
         this.segments = segments;
     }
@@ -77,7 +77,6 @@ public abstract class Windows<W extends Window> {
      * @return the window maintain duration
      * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
-    @SuppressWarnings("DeprecatedIsStillUsed")
     @Deprecated
     public long maintainMs() {
         return maintainDurationMs;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
index 8111cdf..3057e32 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.kstream.Window;
 
 /**
@@ -29,7 +28,6 @@ import org.apache.kafka.streams.kstream.Window;
  * @see org.apache.kafka.streams.kstream.SessionWindows
  * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
-@InterfaceStability.Unstable
 public final class SessionWindow extends Window {
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index af8b073..ef1799b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -127,7 +127,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         if (recordContext == null) {
             throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
         }
-
         return recordContext.partition();
     }
 
@@ -139,7 +138,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         if (recordContext == null) {
             throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
         }
-
         return recordContext.offset();
     }
 
@@ -148,7 +146,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         if (recordContext == null) {
             throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
         }
-
         return recordContext.headers();
     }
 
@@ -160,7 +157,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         if (recordContext == null) {
             throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
         }
-
         return recordContext.timestamp();
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 0ef70b7..ba39368 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.time.Duration;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -31,6 +30,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 
@@ -110,14 +110,14 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
         throw new StreamsException("ProcessorContext#forward() not supported.");
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new StreamsException("ProcessorContext#forward() not supported.");
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final String childName) {
         throw new StreamsException("ProcessorContext#forward() not supported.");
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 900cc71..0693ef7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -88,8 +88,8 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     /**
      * @throws UnsupportedOperationException on every invocation
      */
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
@@ -97,8 +97,8 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     /**
      * @throws UnsupportedOperationException on every invocation
      */
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 0f67dff..2a1d05e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -47,6 +47,9 @@ public interface InternalProcessorContext extends ProcessorContext {
      */
     void setCurrentNode(ProcessorNode currentNode);
 
+    /**
+     * Get the current {@link ProcessorNode}
+     */
     ProcessorNode currentNode();
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 764d50c..2afd5e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -128,8 +128,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         forward(key, value, SEND_TO_ALL);
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @SuppressWarnings("unchecked")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
@@ -139,8 +140,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @SuppressWarnings("unchecked")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
@@ -192,16 +194,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     @Override
     @Deprecated
-    public Cancellable schedule(final long interval,
+    public Cancellable schedule(final long intervalMs,
                                 final PunctuationType type,
                                 final Punctuator callback) {
-        if (interval < 1) {
+        if (intervalMs < 1) {
             throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
         }
-        return task.schedule(interval, type, callback);
+        return task.schedule(intervalMs, type, callback);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
     @Override
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
@@ -315,16 +317,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             return wrapped().fetch(key, time);
         }
 
-        @Deprecated
         @Override
+        @Deprecated
         public WindowStoreIterator<V> fetch(final K key,
                                             final long timeFrom,
                                             final long timeTo) {
             return wrapped().fetch(key, timeFrom, timeTo);
         }
 
-        @Deprecated
         @Override
+        @Deprecated
         public KeyValueIterator<Windowed<K>, V> fetch(final K from,
                                                       final K to,
                                                       final long timeFrom,
@@ -337,8 +339,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             return wrapped().all();
         }
 
-        @Deprecated
         @Override
+        @Deprecated
         public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
                                                          final long timeTo) {
             return wrapped().fetchAll(timeFrom, timeTo);
@@ -505,7 +507,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             return wrapped().fetch(key, time);
         }
 
-        @Deprecated
+        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
         @Override
         public WindowStoreIterator<V> fetch(final K key,
                                             final long timeFrom,
@@ -513,7 +515,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             return wrapped().fetch(key, timeFrom, timeTo);
         }
 
-        @Deprecated
+        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
         @Override
         public KeyValueIterator<Windowed<K>, V> fetch(final K from,
                                                       final K to,
@@ -522,17 +524,17 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             return wrapped().fetch(from, to, timeFrom, timeTo);
         }
 
-        @Override
-        public KeyValueIterator<Windowed<K>, V> all() {
-            return wrapped().all();
-        }
-
-        @Deprecated
+        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
         @Override
         public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
                                                          final long timeTo) {
             return wrapped().fetchAll(timeFrom, timeTo);
         }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> all() {
+            return wrapped().all();
+        }
     }
 
     private static class TimestampedWindowStoreReadWriteDecorator<K, V>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ee69373..49dc5f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.time.Duration;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
@@ -33,6 +32,7 @@ import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
 
@@ -161,8 +161,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     /**
      * @throws UnsupportedOperationException on every invocation
      */
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
@@ -170,8 +170,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     /**
      * @throws UnsupportedOperationException on every invocation
      */
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
@@ -188,7 +188,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    @SuppressWarnings("deprecation")
+    @Deprecated
     public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 113e531..ac2a023 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -196,7 +196,7 @@ public class Stores {
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
      */
-    @Deprecated
+    @Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     public static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                                  final long retentionPeriod,
                                                                  final int numSegments,
@@ -271,21 +271,21 @@ public class Stores {
     /**
      * Create a persistent {@link SessionBytesStoreSupplier}.
      * @param name              name of the store (cannot be {@code null})
-     * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
+     * @param retentionPeriodMs length ot time to retain data in the store (cannot be negative)
      *                          Note that the retention period must be at least long enough to contain the
      *                          windowed data's entire life cycle, from window-start through window-end,
      *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
      */
-    @Deprecated
+    @Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
-                                                                   final long retentionPeriod) {
+                                                                   final long retentionPeriodMs) {
         Objects.requireNonNull(name, "name cannot be null");
-        if (retentionPeriod < 0) {
+        if (retentionPeriodMs < 0) {
             throw new IllegalArgumentException("retentionPeriod cannot be negative");
         }
-        return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod);
+        return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs);
     }
 
     /**
@@ -297,7 +297,7 @@ public class Stores {
      *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // removing #persistentSessionStore(String name, long retentionPeriodMs) will fix this
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                                    final Duration retentionPeriod) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index f7eb37e..83a0ee1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -91,11 +91,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @throws InvalidStateStoreException if the store is not initialized
      * @throws NullPointerException if the given key is {@code null}
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
     @Override
-    default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
+    default WindowStoreIterator<V> fetch(final K key,
+                                         final Instant from,
+                                         final Instant to) {
         return fetch(
             key,
             ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
@@ -115,11 +117,14 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @throws InvalidStateStoreException if the store is not initialized
      * @throws NullPointerException if one of the given keys is {@code null}
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
 
     @Override
-    default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
+    default KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                   final K to,
+                                                   final Instant fromTime,
+                                                   final Instant toTime) {
         return fetch(
             from,
             to,
@@ -135,7 +140,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
     KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 0a869da..0edd8f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -165,7 +165,7 @@ class CachingWindowStore
         }
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key,
                                                           final long timeFrom,
@@ -190,7 +190,7 @@ class CachingWindowStore
         return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
                                                            final Bytes to,
@@ -221,7 +221,7 @@ class CachingWindowStore
         );
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
                                                               final long timeTo) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index ef5a4c7..c58e9f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -65,7 +65,7 @@ class ChangeLoggingWindowBytesStore
         return wrapped().fetch(key, timestamp);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key,
                                              final long from,
@@ -73,7 +73,7 @@ class ChangeLoggingWindowBytesStore
         return wrapped().fetch(key, from, to);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
                                                            final Bytes keyTo,
@@ -87,7 +87,7 @@ class ChangeLoggingWindowBytesStore
         return wrapped().all();
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
                                                               final long timeTo) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 0908085..fbfc7a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -69,7 +69,9 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
 
     @Override
     @Deprecated
-    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+    public WindowStoreIterator<V> fetch(final K key,
+                                        final long timeFrom,
+                                        final long timeTo) {
         Objects.requireNonNull(key, "key can't be null");
         final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
         for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
@@ -89,29 +91,39 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
         return KeyValueIterators.emptyWindowStoreIterator();
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // removing fetch(K from, long from, long to) will fix this
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
+    public WindowStoreIterator<V> fetch(final K key,
+                                        final Instant from,
+                                        final Instant to) throws IllegalArgumentException {
         return fetch(
             key,
             ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
             ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // removing fetch(K from, K to, long from, long to) will fix this
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to,
+                                                  final long timeFrom,
+                                                  final long timeTo) {
         Objects.requireNonNull(from, "from can't be null");
         Objects.requireNonNull(to, "to can't be null");
-        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetch(from, to, timeFrom, timeTo);
-        return new DelegatingPeekingKeyValueIterator<>(storeName,
-                                                       new CompositeKeyValueIterator<>(
-                                                               provider.stores(storeName, windowStoreType).iterator(),
-                                                               nextIteratorFunction));
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
+            store -> store.fetch(from, to, timeFrom, timeTo);
+        return new DelegatingPeekingKeyValueIterator<>(
+            storeName,
+            new CompositeKeyValueIterator<>(
+                provider.stores(storeName, windowStoreType).iterator(),
+                nextIteratorFunction));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to,
+                                                  final Instant fromTime,
+                                                  final Instant toTime) throws IllegalArgumentException {
         return fetch(
             from,
             to,
@@ -121,26 +133,32 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = ReadOnlyWindowStore::all;
-        return new DelegatingPeekingKeyValueIterator<>(storeName,
-                new CompositeKeyValueIterator<>(
-                        provider.stores(storeName, windowStoreType).iterator(),
-                        nextIteratorFunction));
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
+            ReadOnlyWindowStore::all;
+        return new DelegatingPeekingKeyValueIterator<>(
+            storeName,
+            new CompositeKeyValueIterator<>(
+                provider.stores(storeName, windowStoreType).iterator(),
+                nextIteratorFunction));
     }
 
     @Override
     @Deprecated
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
-        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetchAll(timeFrom, timeTo);
-        return new DelegatingPeekingKeyValueIterator<>(storeName,
-                new CompositeKeyValueIterator<>(
-                        provider.stores(storeName, windowStoreType).iterator(),
-                        nextIteratorFunction));
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                     final long timeTo) {
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
+            store -> store.fetchAll(timeFrom, timeTo);
+        return new DelegatingPeekingKeyValueIterator<>(
+            storeName,
+            new CompositeKeyValueIterator<>(
+                provider.stores(storeName, windowStoreType).iterator(),
+                nextIteratorFunction));
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // removing fetchAll(long from, long to) will fix this
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from,
+                                                     final Instant to) throws IllegalArgumentException {
         return fetchAll(
             ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
             ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 681b210..6d2eaab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -159,7 +159,7 @@ public class MeteredWindowStore<K, V>
         }
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public WindowStoreIterator<V> fetch(final K key,
                                         final long timeFrom,
@@ -171,7 +171,7 @@ public class MeteredWindowStore<K, V>
                                                 time);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from,
                                                   final K to,
@@ -185,7 +185,7 @@ public class MeteredWindowStore<K, V>
             time);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
                                                      final long timeTo) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index e621290..3b634eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -69,14 +69,14 @@ public class RocksDBWindowStore
         return bytesValue;
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(key, timeFrom, timeTo);
         return new WindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator();
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
                                                            final Bytes to,
@@ -92,7 +92,7 @@ public class RocksDBWindowStore
         return new WindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 8afd302..1548e7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.time.Duration;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
@@ -36,6 +35,7 @@ import org.apache.kafka.test.MockKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
@@ -171,12 +171,16 @@ public class AbstractProcessorContextTest {
     @SuppressWarnings("unchecked")
     @Test
     public void appConfigsShouldReturnParsedValues() {
-        assertThat((Class<RocksDBConfigSetter>) context.appConfigs().get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG), equalTo(RocksDBConfigSetter.class));
+        assertThat(
+            context.appConfigs().get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG),
+            equalTo(RocksDBConfigSetter.class));
     }
 
     @Test
     public void appConfigsShouldReturnUnrecognizedValues() {
-        assertThat((String) context.appConfigs().get("user.supplied.config"), equalTo("user-suppplied-value"));
+        assertThat(
+            context.appConfigs().get("user.supplied.config"),
+            equalTo("user-suppplied-value"));
     }
 
 
@@ -198,9 +202,11 @@ public class AbstractProcessorContextTest {
             return null;
         }
 
-        @SuppressWarnings("deprecation")
         @Override
-        public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+        @Deprecated
+        public Cancellable schedule(final long interval,
+                                    final PunctuationType type,
+                                    final Punctuator callback) {
             return null;
         }
 
@@ -217,12 +223,12 @@ public class AbstractProcessorContextTest {
         @Override
         public <K, V> void forward(final K key, final V value, final To to) {}
 
-        @SuppressWarnings("deprecation")
         @Override
+        @Deprecated
         public <K, V> void forward(final K key, final V value, final int childIndex) {}
 
-        @SuppressWarnings("deprecation")
         @Override
+        @Deprecated
         public <K, V> void forward(final K key, final V value, final String childName) {}
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
index 03e79b7..c6b2cbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
@@ -47,11 +47,13 @@ public class ForwardingDisabledProcessorContextTest {
         context.forward("key", "value", To.all());
     }
 
+    @SuppressWarnings("deprecation") // need to test deprecated code until removed
     @Test(expected = StreamsException.class)
     public void shouldThrowOnForwardWithChildIndex() {
         context.forward("key", "value", 1);
     }
 
+    @SuppressWarnings("deprecation") // need to test deprecated code until removed
     @Test(expected = StreamsException.class)
     public void shouldThrowOnForwardWithChildName() {
         context.forward("key", "value", "child1");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index deb14e9..4153cca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -106,11 +106,13 @@ public class GlobalProcessorContextImplTest {
         globalContext.forward(null, null, To.all());
     }
 
+    @SuppressWarnings("deprecation") // need to test deprecated code until removed
     @Test(expected = UnsupportedOperationException.class)
     public void shouldNotSupportForwardingViaChildIndex() {
         globalContext.forward(null, null, 0);
     }
 
+    @SuppressWarnings("deprecation") // need to test deprecated code until removed
     @Test(expected = UnsupportedOperationException.class)
     public void shouldNotSupportForwardingViaChildName() {
         globalContext.forward(null, null, "processorName");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 6a7bd02..1e3fad3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -611,7 +611,7 @@ public class ProcessorTopologyTest {
             this.numChildren = numChildren;
         }
 
-        @SuppressWarnings("deprecation")
+        @SuppressWarnings("deprecation") // need to test deprecated code until removed
         @Override
         public void process(final String key, final String value) {
             for (int i = 0; i != numChildren; ++i) {
@@ -631,7 +631,7 @@ public class ProcessorTopologyTest {
             this.numChildren = numChildren;
         }
 
-        @SuppressWarnings("deprecation")
+        @SuppressWarnings("deprecation") // need to test deprecated code until removed
         @Override
         public void process(final String key, final String value) {
             for (int i = 0; i != numChildren; ++i) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 2f2587c..3b38b7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -45,7 +45,6 @@ public class RecordDeserializerTest {
         new byte[0],
         headers);
 
-
     @SuppressWarnings("deprecation")
     @Test
     public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
@@ -81,7 +80,7 @@ public class RecordDeserializerTest {
                       final boolean valueThrowsException,
                       final Object key,
                       final Object value) {
-            super("", Collections.<String>emptyList(), null, null);
+            super("", Collections.emptyList(), null, null);
             this.keyThrowsException = keyThrowsException;
             this.valueThrowsException = valueThrowsException;
             this.key = key;
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index c9255ce..4b92679 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -159,7 +159,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         this.valSerde = valSerde;
     }
 
-    // serdes will override whatever specified in the configs
     @Override
     public Serde<?> keySerde() {
         return keySerde;
@@ -179,7 +178,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         if (stateDir == null) {
             throw new UnsupportedOperationException("State directory not specified");
         }
-
         return stateDir;
     }
 
@@ -195,8 +193,8 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         return storeMap.get(name);
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
         throw new UnsupportedOperationException("schedule() not supported.");
     }
@@ -209,22 +207,24 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     }
 
     @Override
-    public void commit() { }
+    public void commit() {}
 
-    @Override
     @SuppressWarnings("unchecked")
+    @Override
     public <K, V> void forward(final K key, final V value) {
         forward(key, value, To.all());
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final String childName) {
         forward(key, value, To.child(childName));
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 62a8491..5ae97c9 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -63,12 +63,8 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     }
 
     @Override
-    public void initialize() {
-
-    }
+    public void initialize() {}
 
     @Override
-    public void uninitialize() {
-
-    }
+    public void uninitialize() {}
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index c7c8343..77dd418 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.test;
 
-import java.time.Duration;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.Cancellable;
@@ -29,19 +28,21 @@ import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 public class NoOpProcessorContext extends AbstractProcessorContext {
     public boolean initialized;
+    @SuppressWarnings("WeakerAccess")
     public Map<Object, Object> forwardedValues = new HashMap<>();
 
     public NoOpProcessorContext() {
         super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null, null);
     }
 
-    static StreamsConfig streamsConfig() {
+    private static StreamsConfig streamsConfig() {
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "boot");
@@ -53,9 +54,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
         return null;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
-    public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+    @Deprecated
+    public Cancellable schedule(final long interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
         return null;
     }
 
@@ -76,21 +79,20 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
         forwardedValues.put(key, value);
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         forward(key, value);
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final String childName) {
         forward(key, value);
     }
 
     @Override
-    public void commit() {
-    }
+    public void commit() {}
 
     @Override
     public void initialize() {
@@ -99,7 +101,5 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
 
     @Override
     public void register(final StateStore store,
-                         final StateRestoreCallback stateRestoreCallback) {
-        // no-op
-    }
+                         final StateRestoreCallback stateRestoreCallback) {}
 }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 7b4c58b..34a7ed9 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -275,7 +275,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      * @param timestamp A record timestamp
      */
     @SuppressWarnings({"WeakerAccess", "unused"})
-    public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) {
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset,
+                                  final Headers headers,
+                                  final long timestamp) {
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
@@ -390,7 +394,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     @Deprecated
-    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+    public Cancellable schedule(final long intervalMs,
+                                final PunctuationType type,
+                                final Punctuator callback) {
         final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback);
 
         punctuators.add(capturedPunctuator);
@@ -398,7 +404,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return capturedPunctuator::cancel;
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
     @Override
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
@@ -433,8 +439,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         );
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException(
             "Forwarding to a child by index is deprecated. " +
@@ -442,8 +448,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         );
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @Deprecated
     public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException(
             "Forwarding to a child by name is deprecated. " +