You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/08 20:31:48 UTC
[kafka] branch trunk updated: MINOR: cleanup deprectaion
annotations (#6290)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3619d2f MINOR: cleanup deprectaion annotations (#6290)
3619d2f is described below
commit 3619d2f383f65108dfd33686119f675aaeea54b7
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 8 12:31:34 2019 -0800
MINOR: cleanup deprectaion annotations (#6290)
If deprecated interface methods are inherited, the @Deprication tag should be used (instead on suppressing the deprecation warning).
Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../apache/kafka/streams/kstream/JoinWindows.java | 14 +----
.../kafka/streams/kstream/SessionWindows.java | 2 -
.../apache/kafka/streams/kstream/TimeWindows.java | 14 ++---
.../kafka/streams/kstream/UnlimitedWindows.java | 9 +--
.../org/apache/kafka/streams/kstream/Windows.java | 3 +-
.../streams/kstream/internals/SessionWindow.java | 2 -
.../internals/AbstractProcessorContext.java | 4 --
.../ForwardingDisabledProcessorContext.java | 6 +-
.../internals/GlobalProcessorContextImpl.java | 4 +-
.../internals/InternalProcessorContext.java | 3 +
.../processor/internals/ProcessorContextImpl.java | 36 ++++++------
.../processor/internals/StandbyContextImpl.java | 8 +--
.../org/apache/kafka/streams/state/Stores.java | 14 ++---
.../apache/kafka/streams/state/WindowStore.java | 15 +++--
.../state/internals/CachingWindowStore.java | 6 +-
.../internals/ChangeLoggingWindowBytesStore.java | 6 +-
.../internals/CompositeReadOnlyWindowStore.java | 66 ++++++++++++++--------
.../state/internals/MeteredWindowStore.java | 6 +-
.../state/internals/RocksDBWindowStore.java | 6 +-
.../internals/AbstractProcessorContextTest.java | 20 ++++---
.../ForwardingDisabledProcessorContextTest.java | 2 +
.../internals/GlobalProcessorContextImplTest.java | 2 +
.../processor/internals/ProcessorTopologyTest.java | 4 +-
.../internals/RecordDeserializerTest.java | 3 +-
.../kafka/test/InternalMockProcessorContext.java | 14 ++---
.../kafka/test/MockInternalProcessorContext.java | 8 +--
.../apache/kafka/test/NoOpProcessorContext.java | 22 ++++----
.../streams/processor/MockProcessorContext.java | 16 ++++--
28 files changed, 165 insertions(+), 150 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 219489f..6331877 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -92,7 +92,7 @@ public final class JoinWindows extends Windows<Window> {
this.maintainDurationMs = maintainDurationMs;
}
- @SuppressWarnings("deprecation") // removing segments from Windows will fix this
+ @Deprecated // removing segments from Windows will fix this
private JoinWindows(final long beforeMs,
final long afterMs,
final long graceMs,
@@ -131,7 +131,6 @@ public final class JoinWindows extends Windows<Window> {
* @param timeDifference join window interval
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings("deprecation")
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
return of(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
@@ -148,7 +147,6 @@ public final class JoinWindows extends Windows<Window> {
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #before(Duration)} instead.
*/
- @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Deprecated
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, afterMs, graceMs, maintainDurationMs, segments);
@@ -164,7 +162,6 @@ public final class JoinWindows extends Windows<Window> {
* @param timeDifference relative window start time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings("deprecation") // removing segments from Windows will fix this
public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
return before(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
@@ -181,7 +178,6 @@ public final class JoinWindows extends Windows<Window> {
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #after(Duration)} instead
*/
- @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Deprecated
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(beforeMs, timeDifferenceMs, graceMs, maintainDurationMs, segments);
@@ -197,7 +193,6 @@ public final class JoinWindows extends Windows<Window> {
* @param timeDifference relative window end time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings("deprecation") // removing segments from Windows will fix this
public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
return after(ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix));
@@ -239,7 +234,6 @@ public final class JoinWindows extends Windows<Window> {
return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, maintainDurationMs, segments);
}
- @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@Override
public long gracePeriodMs() {
// NOTE: in the future, when we remove maintainMs,
@@ -254,7 +248,6 @@ public final class JoinWindows extends Windows<Window> {
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
* @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} instead.
*/
- @SuppressWarnings("deprecation")
@Override
@Deprecated
public JoinWindows until(final long durationMs) throws IllegalArgumentException {
@@ -272,14 +265,13 @@ public final class JoinWindows extends Windows<Window> {
* @return the window maintain duration
* @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}.
*/
- @SuppressWarnings({"deprecation", "deprecatedMemberStillInUse"})
@Override
@Deprecated
public long maintainMs() {
return Math.max(maintainDurationMs, size());
}
- @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -296,7 +288,7 @@ public final class JoinWindows extends Windows<Window> {
graceMs == that.graceMs;
}
- @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public int hashCode() {
return Objects.hash(beforeMs, afterMs, graceMs, maintainDurationMs, segments);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 9c77fa5..c0153a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -108,7 +108,6 @@ public final class SessionWindows {
*
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings("deprecation")
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
return with(ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix));
@@ -163,7 +162,6 @@ public final class SessionWindows {
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
public long gracePeriodMs() {
-
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - gapMs) if you want to be super accurate.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 03203f0..a87dbf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -79,7 +79,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
}
/** Private constructor for preserving segments. Can be removed along with Windows.segments. **/
- @SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
private TimeWindows(final long sizeMs,
final long advanceMs,
@@ -127,7 +126,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @return a new window definition with default maintain duration of 1 day
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // removing #of(final long sizeMs) will fix this
public static TimeWindows of(final Duration size) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
return of(ApiUtils.validateMillisecondDuration(size, msgPrefix));
@@ -145,7 +144,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
* @deprecated Use {@link #advanceBy(Duration)} instead
*/
- @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
@Deprecated
public TimeWindows advanceBy(final long advanceMs) {
if (advanceMs <= 0 || advanceMs > sizeMs) {
@@ -166,7 +164,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @return a new window definition with default maintain duration of 1 day
* @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
*/
- @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
+ @SuppressWarnings("deprecation") // removing #advanceBy(final long advanceMs) will fix this
public TimeWindows advanceBy(final Duration advance) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
return advanceBy(ApiUtils.validateMillisecondDuration(advance, msgPrefix));
@@ -227,7 +225,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @deprecated since 2.1. Use {@link Materialized#retention} or directly configure the retention in a store supplier
* and use {@link Materialized#as(WindowBytesStoreSupplier)}.
*/
- @SuppressWarnings("deprecation")
@Override
@Deprecated
public TimeWindows until(final long durationMs) throws IllegalArgumentException {
@@ -245,14 +242,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @return the window maintain duration
* @deprecated since 2.1. Use {@link Materialized#retention} instead.
*/
- @SuppressWarnings({"DeprecatedIsStillUsed", "deprecation"})
@Override
@Deprecated
public long maintainMs() {
return Math.max(maintainDurationMs, sizeMs);
}
- @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -269,13 +265,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
graceMs == that.graceMs;
}
- @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public int hashCode() {
return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, graceMs);
}
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public String toString() {
return "TimeWindows{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index e1894ba..f8ec6ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -84,7 +84,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
* @return a new unlimited window that starts at {@code start}
* @throws IllegalArgumentException if the start time is negative or can't be represented as {@code long milliseconds}
*/
- @SuppressWarnings("deprecation")
public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start");
return startOn(ApiUtils.validateMillisecondInstant(start, msgPrefix));
@@ -120,7 +119,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
* @throws IllegalArgumentException on every invocation.
* @deprecated since 2.1.
*/
- @SuppressWarnings("deprecation")
@Override
@Deprecated
public UnlimitedWindows until(final long durationMs) {
@@ -134,7 +132,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
* @return the window retention time that is {@link Long#MAX_VALUE}
* @deprecated since 2.1. Use {@link Materialized#retention} instead.
*/
- @SuppressWarnings("deprecation")
@Override
@Deprecated
public long maintainMs() {
@@ -146,7 +143,7 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
return 0L;
}
- @SuppressWarnings({"deprecation", "NonFinalFieldReferenceInEquals"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -159,13 +156,13 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
return startMs == that.startMs && segments == that.segments;
}
- @SuppressWarnings({"deprecation", "NonFinalFieldReferencedInHashCode"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public int hashCode() {
return Objects.hash(startMs, segments);
}
- @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
+ @SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public String toString() {
return "UnlimitedWindows{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index feaee1e..e122b4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -46,7 +46,7 @@ public abstract class Windows<W extends Window> {
protected Windows() {}
- @SuppressWarnings("deprecation") // remove this constructor when we remove segments.
+ @Deprecated // remove this constructor when we remove segments.
Windows(final int segments) {
this.segments = segments;
}
@@ -77,7 +77,6 @@ public abstract class Windows<W extends Window> {
* @return the window maintain duration
* @deprecated since 2.1. Use {@link Materialized#retention} instead.
*/
- @SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public long maintainMs() {
return maintainDurationMs;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
index 8111cdf..3057e32 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.kstream.Window;
/**
@@ -29,7 +28,6 @@ import org.apache.kafka.streams.kstream.Window;
* @see org.apache.kafka.streams.kstream.SessionWindows
* @see org.apache.kafka.streams.processor.TimestampExtractor
*/
-@InterfaceStability.Unstable
public final class SessionWindow extends Window {
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index af8b073..ef1799b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -127,7 +127,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
if (recordContext == null) {
throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
}
-
return recordContext.partition();
}
@@ -139,7 +138,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
if (recordContext == null) {
throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
}
-
return recordContext.offset();
}
@@ -148,7 +146,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
if (recordContext == null) {
throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
}
-
return recordContext.headers();
}
@@ -160,7 +157,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
if (recordContext == null) {
throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
}
-
return recordContext.timestamp();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 0ef70b7..ba39368 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.time.Duration;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
@@ -31,6 +30,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import java.io.File;
+import java.time.Duration;
import java.util.Map;
import java.util.Objects;
@@ -110,14 +110,14 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
throw new StreamsException("ProcessorContext#forward() not supported.");
}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 900cc71..0693ef7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -88,8 +88,8 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
/**
* @throws UnsupportedOperationException on every invocation
*/
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
}
@@ -97,8 +97,8 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
/**
* @throws UnsupportedOperationException on every invocation
*/
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 0f67dff..2a1d05e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -47,6 +47,9 @@ public interface InternalProcessorContext extends ProcessorContext {
*/
void setCurrentNode(ProcessorNode currentNode);
+ /**
+ * Get the current {@link ProcessorNode}
+ */
ProcessorNode currentNode();
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 764d50c..2afd5e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -128,8 +128,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
forward(key, value, SEND_TO_ALL);
}
- @SuppressWarnings({"unchecked", "deprecation"})
+ @SuppressWarnings("unchecked")
@Override
+ @Deprecated
public <K, V> void forward(final K key,
final V value,
final int childIndex) {
@@ -139,8 +140,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
}
- @SuppressWarnings({"unchecked", "deprecation"})
+ @SuppressWarnings("unchecked")
@Override
+ @Deprecated
public <K, V> void forward(final K key,
final V value,
final String childName) {
@@ -192,16 +194,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
@Deprecated
- public Cancellable schedule(final long interval,
+ public Cancellable schedule(final long intervalMs,
final PunctuationType type,
final Punctuator callback) {
- if (interval < 1) {
+ if (intervalMs < 1) {
throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
}
- return task.schedule(interval, type, callback);
+ return task.schedule(intervalMs, type, callback);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
@@ -315,16 +317,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return wrapped().fetch(key, time);
}
- @Deprecated
@Override
+ @Deprecated
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(key, timeFrom, timeTo);
}
- @Deprecated
@Override
+ @Deprecated
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
final long timeFrom,
@@ -337,8 +339,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return wrapped().all();
}
- @Deprecated
@Override
+ @Deprecated
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
@@ -505,7 +507,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return wrapped().fetch(key, time);
}
- @Deprecated
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
@@ -513,7 +515,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return wrapped().fetch(key, timeFrom, timeTo);
}
- @Deprecated
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
@@ -522,17 +524,17 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return wrapped().fetch(from, to, timeFrom, timeTo);
}
- @Override
- public KeyValueIterator<Windowed<K>, V> all() {
- return wrapped().all();
- }
-
- @Deprecated
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
}
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ return wrapped().all();
+ }
}
private static class TimestampedWindowStoreReadWriteDecorator<K, V>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ee69373..49dc5f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.time.Duration;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
@@ -33,6 +32,7 @@ import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import java.time.Duration;
import java.util.Collections;
import java.util.Map;
@@ -161,8 +161,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
/**
* @throws UnsupportedOperationException on every invocation
*/
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@@ -170,8 +170,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
/**
* @throws UnsupportedOperationException on every invocation
*/
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@@ -188,7 +188,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
* @throws UnsupportedOperationException on every invocation
*/
@Override
- @SuppressWarnings("deprecation")
+ @Deprecated
public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 113e531..ac2a023 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -196,7 +196,7 @@ public class Stores {
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
- @Deprecated
+ @Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final int numSegments,
@@ -271,21 +271,21 @@ public class Stores {
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
- * @param retentionPeriod length ot time to retain data in the store (cannot be negative)
+ * @param retentionPeriodMs length ot time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @return an instance of a {@link SessionBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
*/
- @Deprecated
+ @Deprecated // continuing to support Windows#maintainMs/segmentInterval in fallback mode
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
- final long retentionPeriod) {
+ final long retentionPeriodMs) {
Objects.requireNonNull(name, "name cannot be null");
- if (retentionPeriod < 0) {
+ if (retentionPeriodMs < 0) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
- return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod);
+ return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs);
}
/**
@@ -297,7 +297,7 @@ public class Stores {
* and for the entire grace period.
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // removing #persistentSessionStore(String name, long retentionPeriodMs) will fix this
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index f7eb37e..83a0ee1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -91,11 +91,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if the given key is {@code null}
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@Override
- default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
+ default WindowStoreIterator<V> fetch(final K key,
+ final Instant from,
+ final Instant to) {
return fetch(
key,
ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
@@ -115,11 +117,14 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if one of the given keys is {@code null}
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
@Override
- default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
+ default KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final Instant fromTime,
+ final Instant toTime) {
return fetch(
from,
to,
@@ -135,7 +140,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 0a869da..0edd8f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -165,7 +165,7 @@ class CachingWindowStore
}
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key,
final long timeFrom,
@@ -190,7 +190,7 @@ class CachingWindowStore
return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
@@ -221,7 +221,7 @@ class CachingWindowStore
);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index ef5a4c7..c58e9f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -65,7 +65,7 @@ class ChangeLoggingWindowBytesStore
return wrapped().fetch(key, timestamp);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key,
final long from,
@@ -73,7 +73,7 @@ class ChangeLoggingWindowBytesStore
return wrapped().fetch(key, from, to);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
@@ -87,7 +87,7 @@ class ChangeLoggingWindowBytesStore
return wrapped().all();
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 0908085..fbfc7a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -69,7 +69,9 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
@Override
@Deprecated
- public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
Objects.requireNonNull(key, "key can't be null");
final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
@@ -89,29 +91,39 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
return KeyValueIterators.emptyWindowStoreIterator();
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // removing fetch(K from, long from, long to) will fix this
@Override
- public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
+ public WindowStoreIterator<V> fetch(final K key,
+ final Instant from,
+ final Instant to) throws IllegalArgumentException {
return fetch(
key,
ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // removing fetch(K from, K to, long from, long to) will fix this
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final long timeFrom,
+ final long timeTo) {
Objects.requireNonNull(from, "from can't be null");
Objects.requireNonNull(to, "to can't be null");
- final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetch(from, to, timeFrom, timeTo);
- return new DelegatingPeekingKeyValueIterator<>(storeName,
- new CompositeKeyValueIterator<>(
- provider.stores(storeName, windowStoreType).iterator(),
- nextIteratorFunction));
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
+ store -> store.fetch(from, to, timeFrom, timeTo);
+ return new DelegatingPeekingKeyValueIterator<>(
+ storeName,
+ new CompositeKeyValueIterator<>(
+ provider.stores(storeName, windowStoreType).iterator(),
+ nextIteratorFunction));
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final Instant fromTime,
+ final Instant toTime) throws IllegalArgumentException {
return fetch(
from,
to,
@@ -121,26 +133,32 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
@Override
public KeyValueIterator<Windowed<K>, V> all() {
- final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = ReadOnlyWindowStore::all;
- return new DelegatingPeekingKeyValueIterator<>(storeName,
- new CompositeKeyValueIterator<>(
- provider.stores(storeName, windowStoreType).iterator(),
- nextIteratorFunction));
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
+ ReadOnlyWindowStore::all;
+ return new DelegatingPeekingKeyValueIterator<>(
+ storeName,
+ new CompositeKeyValueIterator<>(
+ provider.stores(storeName, windowStoreType).iterator(),
+ nextIteratorFunction));
}
@Override
@Deprecated
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
- final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = store -> store.fetchAll(timeFrom, timeTo);
- return new DelegatingPeekingKeyValueIterator<>(storeName,
- new CompositeKeyValueIterator<>(
- provider.stores(storeName, windowStoreType).iterator(),
- nextIteratorFunction));
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
+ final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
+ store -> store.fetchAll(timeFrom, timeTo);
+ return new DelegatingPeekingKeyValueIterator<>(
+ storeName,
+ new CompositeKeyValueIterator<>(
+ provider.stores(storeName, windowStoreType).iterator(),
+ nextIteratorFunction));
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // removing fetchAll(long from, long to) will fix this
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from,
+ final Instant to) throws IllegalArgumentException {
return fetchAll(
ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")),
ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 681b210..6d2eaab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -159,7 +159,7 @@ public class MeteredWindowStore<K, V>
}
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
@@ -171,7 +171,7 @@ public class MeteredWindowStore<K, V>
time);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
@@ -185,7 +185,7 @@ public class MeteredWindowStore<K, V>
time);
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index e621290..3b634eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -69,14 +69,14 @@ public class RocksDBWindowStore
return bytesValue;
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(key, timeFrom, timeTo);
return new WindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator();
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
@@ -92,7 +92,7 @@ public class RocksDBWindowStore
return new WindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 8afd302..1548e7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.time.Duration;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -36,6 +35,7 @@ import org.apache.kafka.test.MockKeyValueStore;
import org.junit.Before;
import org.junit.Test;
+import java.time.Duration;
import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
@@ -171,12 +171,16 @@ public class AbstractProcessorContextTest {
@SuppressWarnings("unchecked")
@Test
public void appConfigsShouldReturnParsedValues() {
- assertThat((Class<RocksDBConfigSetter>) context.appConfigs().get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG), equalTo(RocksDBConfigSetter.class));
+ assertThat(
+ context.appConfigs().get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG),
+ equalTo(RocksDBConfigSetter.class));
}
@Test
public void appConfigsShouldReturnUnrecognizedValues() {
- assertThat((String) context.appConfigs().get("user.supplied.config"), equalTo("user-suppplied-value"));
+ assertThat(
+ context.appConfigs().get("user.supplied.config"),
+ equalTo("user-suppplied-value"));
}
@@ -198,9 +202,11 @@ public class AbstractProcessorContextTest {
return null;
}
- @SuppressWarnings("deprecation")
@Override
- public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+ @Deprecated
+ public Cancellable schedule(final long interval,
+ final PunctuationType type,
+ final Punctuator callback) {
return null;
}
@@ -217,12 +223,12 @@ public class AbstractProcessorContextTest {
@Override
public <K, V> void forward(final K key, final V value, final To to) {}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
index 03e79b7..c6b2cbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
@@ -47,11 +47,13 @@ public class ForwardingDisabledProcessorContextTest {
context.forward("key", "value", To.all());
}
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
@Test(expected = StreamsException.class)
public void shouldThrowOnForwardWithChildIndex() {
context.forward("key", "value", 1);
}
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
@Test(expected = StreamsException.class)
public void shouldThrowOnForwardWithChildName() {
context.forward("key", "value", "child1");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index deb14e9..4153cca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -106,11 +106,13 @@ public class GlobalProcessorContextImplTest {
globalContext.forward(null, null, To.all());
}
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
@Test(expected = UnsupportedOperationException.class)
public void shouldNotSupportForwardingViaChildIndex() {
globalContext.forward(null, null, 0);
}
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
@Test(expected = UnsupportedOperationException.class)
public void shouldNotSupportForwardingViaChildName() {
globalContext.forward(null, null, "processorName");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 6a7bd02..1e3fad3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -611,7 +611,7 @@ public class ProcessorTopologyTest {
this.numChildren = numChildren;
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
@Override
public void process(final String key, final String value) {
for (int i = 0; i != numChildren; ++i) {
@@ -631,7 +631,7 @@ public class ProcessorTopologyTest {
this.numChildren = numChildren;
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
@Override
public void process(final String key, final String value) {
for (int i = 0; i != numChildren; ++i) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 2f2587c..3b38b7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -45,7 +45,6 @@ public class RecordDeserializerTest {
new byte[0],
headers);
-
@SuppressWarnings("deprecation")
@Test
public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
@@ -81,7 +80,7 @@ public class RecordDeserializerTest {
final boolean valueThrowsException,
final Object key,
final Object value) {
- super("", Collections.<String>emptyList(), null, null);
+ super("", Collections.emptyList(), null, null);
this.keyThrowsException = keyThrowsException;
this.valueThrowsException = valueThrowsException;
this.key = key;
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index c9255ce..4b92679 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -159,7 +159,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
this.valSerde = valSerde;
}
- // serdes will override whatever specified in the configs
@Override
public Serde<?> keySerde() {
return keySerde;
@@ -179,7 +178,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
if (stateDir == null) {
throw new UnsupportedOperationException("State directory not specified");
}
-
return stateDir;
}
@@ -195,8 +193,8 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
return storeMap.get(name);
}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
throw new UnsupportedOperationException("schedule() not supported.");
}
@@ -209,22 +207,24 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
}
@Override
- public void commit() { }
+ public void commit() {}
- @Override
@SuppressWarnings("unchecked")
+ @Override
public <K, V> void forward(final K key, final V value) {
forward(key, value, To.all());
}
+ @SuppressWarnings("unchecked")
@Override
- @SuppressWarnings({"unchecked", "deprecation"})
+ @Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
}
+ @SuppressWarnings("unchecked")
@Override
- @SuppressWarnings({"unchecked", "deprecation"})
+ @Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
forward(key, value, To.child(childName));
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 62a8491..5ae97c9 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -63,12 +63,8 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
}
@Override
- public void initialize() {
-
- }
+ public void initialize() {}
@Override
- public void uninitialize() {
-
- }
+ public void uninitialize() {}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index c7c8343..77dd418 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.test;
-import java.time.Duration;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
@@ -29,19 +28,21 @@ import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class NoOpProcessorContext extends AbstractProcessorContext {
public boolean initialized;
+ @SuppressWarnings("WeakerAccess")
public Map<Object, Object> forwardedValues = new HashMap<>();
public NoOpProcessorContext() {
super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null, null);
}
- static StreamsConfig streamsConfig() {
+ private static StreamsConfig streamsConfig() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "boot");
@@ -53,9 +54,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
return null;
}
- @SuppressWarnings("deprecation")
@Override
- public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+ @Deprecated
+ public Cancellable schedule(final long interval,
+ final PunctuationType type,
+ final Punctuator callback) {
return null;
}
@@ -76,21 +79,20 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
forwardedValues.put(key, value);
}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
forward(key, value);
}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
forward(key, value);
}
@Override
- public void commit() {
- }
+ public void commit() {}
@Override
public void initialize() {
@@ -99,7 +101,5 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
@Override
public void register(final StateStore store,
- final StateRestoreCallback stateRestoreCallback) {
- // no-op
- }
+ final StateRestoreCallback stateRestoreCallback) {}
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 7b4c58b..34a7ed9 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -275,7 +275,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
* @param timestamp A record timestamp
*/
@SuppressWarnings({"WeakerAccess", "unused"})
- public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) {
+ public void setRecordMetadata(final String topic,
+ final int partition,
+ final long offset,
+ final Headers headers,
+ final long timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
@@ -390,7 +394,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
@Override
@Deprecated
- public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+ public Cancellable schedule(final long intervalMs,
+ final PunctuationType type,
+ final Punctuator callback) {
final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback);
punctuators.add(capturedPunctuator);
@@ -398,7 +404,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
return capturedPunctuator::cancel;
}
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
@@ -433,8 +439,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
);
}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException(
"Forwarding to a child by index is deprecated. " +
@@ -442,8 +448,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
);
}
- @SuppressWarnings("deprecation")
@Override
+ @Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException(
"Forwarding to a child by name is deprecated. " +