You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/02/12 04:26:51 UTC
[kafka] branch trunk updated: KAFKA-12939: After migrating processors, search the codebase for missed migrations (#11534)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9931036 KAFKA-12939: After migrating processors, search the codebase for missed migrations (#11534)
9931036 is described below
commit 99310360a589ccb28f7636b714e98e2e7c5a5110
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Sat Feb 12 04:25:03 2022 +0000
KAFKA-12939: After migrating processors, search the codebase for missed migrations (#11534)
Migrated internal usages that had previously been marked with TODO suppressions.
Reviewer: John Roesler<vv...@apache.org>
---
.../kstream/internals/KStreamFlatTransform.java | 23 ++++--
.../internals/KStreamFlatTransformValues.java | 23 ++++--
.../kstream/internals/KStreamTransformValues.java | 29 ++++---
.../processor/internals/RecordDeserializer.java | 9 +-
.../streams/processor/internals/RecordQueue.java | 9 +-
.../kafka/streams/state/ValueAndTimestamp.java | 4 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 32 +++++---
.../integration/GlobalKTableIntegrationTest.java | 7 +-
.../integration/GlobalThreadShutDownOrderTest.java | 16 ++--
.../integration/RangeQueryIntegrationTest.java | 55 ++++++-------
.../integration/RestoreIntegrationTest.java | 25 ++----
.../integration/StoreUpgradeIntegrationTest.java | 73 +++++++----------
.../integration/TaskMetadataIntegrationTest.java | 10 +--
.../kstream/RepartitionTopicNamingTest.java | 81 +++++++++++-------
.../kstream/internals/AbstractStreamTest.java | 20 +++--
.../kstream/internals/GlobalKTableJoinsTest.java | 11 +--
.../kstream/internals/KGroupedStreamImplTest.java | 92 ++++++++++-----------
.../kstream/internals/KStreamFilterTest.java | 9 +-
.../kstream/internals/KStreamFlatMapTest.java | 7 +-
.../internals/KStreamFlatMapValuesTest.java | 8 +-
.../internals/KStreamFlatTransformTest.java | 21 ++---
.../internals/KStreamFlatTransformValuesTest.java | 21 ++---
.../internals/KStreamGlobalKTableJoinTest.java | 9 +-
.../internals/KStreamGlobalKTableLeftJoinTest.java | 9 +-
.../streams/kstream/internals/KStreamImplTest.java | 9 +-
.../kstream/internals/KStreamKStreamJoinTest.java | 60 +++++++-------
.../internals/KStreamKStreamLeftJoinTest.java | 56 ++++++-------
.../internals/KStreamKStreamOuterJoinTest.java | 51 ++++++------
.../kstream/internals/KStreamKTableJoinTest.java | 9 +-
.../internals/KStreamKTableLeftJoinTest.java | 9 +-
.../streams/kstream/internals/KStreamMapTest.java | 5 +-
.../kstream/internals/KStreamMapValuesTest.java | 6 +-
.../kstream/internals/KStreamSelectKeyTest.java | 6 +-
.../KStreamSlidingWindowAggregateTest.java | 62 ++++++--------
.../internals/KStreamTransformValuesTest.java | 12 +--
.../internals/KStreamWindowAggregateTest.java | 25 +++---
.../kstream/internals/KTableFilterTest.java | 7 +-
.../streams/kstream/internals/KTableImplTest.java | 14 ++--
.../kstream/internals/KTableMapKeysTest.java | 5 +-
.../kstream/internals/KTableMapValuesTest.java | 9 +-
.../kstream/internals/KTableSourceTest.java | 4 +-
.../internals/SessionWindowedKStreamImplTest.java | 11 ++-
.../internals/SlidingWindowedKStreamImplTest.java | 11 ++-
.../internals/TimeWindowedKStreamImplTest.java | 11 ++-
.../internals/graph/GraphGraceSearchUtilTest.java | 29 +++----
.../internals/graph/TableProcessorNodeTest.java | 10 ++-
.../processor/internals/ProcessorNodeTest.java | 52 +++++-------
.../processor/internals/PunctuationQueueTest.java | 48 ++---------
.../internals/RepartitionOptimizingTest.java | 12 ++-
.../processor/internals/StreamThreadTest.java | 23 +++---
.../apache/kafka/streams/tests/EosTestClient.java | 1 -
.../kafka/streams/tests/SmokeTestClient.java | 12 ++-
.../apache/kafka/streams/tests/SmokeTestUtil.java | 95 +++++++++++-----------
.../kafka/streams/tests/StreamsUpgradeTest.java | 1 -
54 files changed, 592 insertions(+), 676 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
index 4d4fd2b..dc66cf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
@@ -19,12 +19,17 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {
+public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
private final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
@@ -33,7 +38,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.ka
}
@Override
- public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
+ public Processor<KIn, VIn, KOut, VOut> get() {
return new KStreamFlatTransformProcessor<>(transformerSupplier.get());
}
@@ -42,7 +47,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.ka
return transformerSupplier.stores();
}
- public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {
+ public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends ContextualProcessor<KIn, VIn, KOut, VOut> {
private final Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;
@@ -51,17 +56,17 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.ka
}
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+ public void init(final ProcessorContext<KOut, VOut> context) {
super.init(context);
- transformer.init(context);
+ transformer.init((InternalProcessorContext<KOut, VOut>) context);
}
@Override
- public void process(final KIn key, final VIn value) {
- final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(key, value);
+ public void process(final Record<KIn, VIn> record) {
+ final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(record.key(), record.value());
if (pairs != null) {
for (final KeyValue<KOut, VOut> pair : pairs) {
- context().forward(pair.key, pair.value);
+ context().forward(record.withKey(pair.key).withValue(pair.value));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
index baf44c7..5469c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
@@ -18,13 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {
+public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier;
@@ -33,7 +38,7 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.ka
}
@Override
- public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
+ public Processor<KIn, VIn, KIn, VOut> get() {
return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
}
@@ -42,7 +47,7 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.ka
return valueTransformerSupplier.stores();
}
- public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {
+ public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {
private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
@@ -51,17 +56,17 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.ka
}
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+ public void init(final ProcessorContext<KIn, VOut> context) {
super.init(context);
- valueTransformer.init(new ForwardingDisabledProcessorContext(context));
+ valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext<KIn, VOut>) context));
}
@Override
- public void process(final KIn key, final VIn value) {
- final Iterable<VOut> transformedValues = valueTransformer.transform(key, value);
+ public void process(final Record<KIn, VIn> record) {
+ final Iterable<VOut> transformedValues = valueTransformer.transform(record.key(), record.value());
if (transformedValues != null) {
for (final VOut transformedValue : transformedValues) {
- context.forward(key, transformedValue);
+ context().forward(record.withValue(transformedValue));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 468bf8d..1b767ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -18,22 +18,27 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamTransformValues<K, V, R> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
+public class KStreamTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
- private final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier;
+ private final ValueTransformerWithKeySupplier<KIn, VIn, VOut> valueTransformerSupplier;
- KStreamTransformValues(final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier) {
+ KStreamTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn, VOut> valueTransformerSupplier) {
this.valueTransformerSupplier = valueTransformerSupplier;
}
@Override
- public org.apache.kafka.streams.processor.Processor<K, V> get() {
+ public Processor<KIn, VIn, KIn, VOut> get() {
return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
}
@@ -42,23 +47,23 @@ public class KStreamTransformValues<K, V, R> implements org.apache.kafka.streams
return valueTransformerSupplier.stores();
}
- public static class KStreamTransformValuesProcessor<K, V, R> extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+ public static class KStreamTransformValuesProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {
- private final ValueTransformerWithKey<K, V, R> valueTransformer;
+ private final ValueTransformerWithKey<KIn, VIn, VOut> valueTransformer;
- KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer) {
+ KStreamTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, VOut> valueTransformer) {
this.valueTransformer = valueTransformer;
}
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+ public void init(final ProcessorContext<KIn, VOut> context) {
super.init(context);
- valueTransformer.init(new ForwardingDisabledProcessorContext(context));
+ valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext<KIn, VOut>) context));
}
@Override
- public void process(final K key, final V value) {
- context.forward(key, valueTransformer.transform(key, value));
+ public void process(final Record<KIn, VIn> record) {
+ context().forward(record.withValue(valueTransformer.transform(record.key(), record.value())));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index b5c821a..6ffe955 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
import java.util.Optional;
@@ -50,7 +50,7 @@ class RecordDeserializer {
* {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL}
* or throws an exception itself
*/
- ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext,
+ ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processorContext,
final ConsumerRecord<byte[], byte[]> rawRecord) {
try {
@@ -70,7 +70,10 @@ class RecordDeserializer {
} catch (final Exception deserializationException) {
final DeserializationExceptionHandler.DeserializationHandlerResponse response;
try {
- response = deserializationExceptionHandler.handle(processorContext, rawRecord, deserializationException);
+ response = deserializationExceptionHandler.handle(
+ (InternalProcessorContext<?, ?>) processorContext,
+ rawRecord,
+ deserializationException);
} catch (final Exception fatalUserException) {
log.error(
"Deserialization error callback failed after deserialization error for record {}",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index db27f0b..1c01966 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
@@ -41,7 +41,7 @@ public class RecordQueue {
private final Logger log;
private final SourceNode<?, ?> source;
private final TopicPartition partition;
- private final ProcessorContext processorContext;
+ private final ProcessorContext<?, ?> processorContext;
private final TimestampExtractor timestampExtractor;
private final RecordDeserializer recordDeserializer;
private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
@@ -55,7 +55,7 @@ public class RecordQueue {
final SourceNode<?, ?> source,
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
- final InternalProcessorContext processorContext,
+ final InternalProcessorContext<?, ?> processorContext,
final LogContext logContext) {
this.source = source;
this.partition = partition;
@@ -175,7 +175,8 @@ public class RecordQueue {
while (headRecord == null && !fifoQueue.isEmpty()) {
final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst();
- final ConsumerRecord<Object, Object> deserialized = recordDeserializer.deserialize(processorContext, raw);
+ final ConsumerRecord<Object, Object> deserialized =
+ recordDeserializer.deserialize(processorContext, raw);
if (deserialized == null) {
// this only happens if the deserializer decides to skip. It has already logged the reason.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index f5fc7a2..1cf7277 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -37,12 +37,12 @@ public final class ValueAndTimestamp<V> {
}
/**
- * Create a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null}.
+ * Create a new {@link ValueAndTimestamp} instance if the provided {@code value} is not {@code null}.
*
* @param value the value
* @param timestamp the timestamp
* @param <V> the type of the value
- * @return a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null};
+ * @return a new {@link ValueAndTimestamp} instance if the provided {@code value} is not {@code null};
* otherwise {@code null} is returned
*/
public static <V> ValueAndTimestamp<V> make(final V value,
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 0113e56..06854db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -45,11 +45,10 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
@@ -77,7 +76,6 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class StreamsBuilderTest {
private static final String STREAM_TOPIC = "stream-topic";
@@ -301,7 +299,7 @@ public class StreamsBuilderTest {
final KStream<String, String> source = builder.stream("topic-source");
source.to("topic-sink");
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
source.process(processorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -321,10 +319,10 @@ public class StreamsBuilderTest {
final KStream<String, String> source = builder.stream("topic-source");
final KStream<String, String> through = source.through("topic-sink");
- final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> sourceProcessorSupplier = new MockApiProcessorSupplier<>();
source.process(sourceProcessorSupplier);
- final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> throughProcessorSupplier = new MockApiProcessorSupplier<>();
through.process(throughProcessorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -342,10 +340,10 @@ public class StreamsBuilderTest {
final KStream<String, String> source = builder.stream("topic-source");
final KStream<String, String> through = source.repartition();
- final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> sourceProcessorSupplier = new MockApiProcessorSupplier<>();
source.process(sourceProcessorSupplier);
- final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> throughProcessorSupplier = new MockApiProcessorSupplier<>();
through.process(throughProcessorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -367,7 +365,7 @@ public class StreamsBuilderTest {
final KStream<String, String> source2 = builder.stream(topic2);
final KStream<String, String> merged = source1.merge(source2);
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
merged.process(processorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -707,6 +705,7 @@ public class StreamsBuilderTest {
STREAM_OPERATION_NAME);
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabledViaOldApi() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
@@ -798,12 +797,16 @@ public class StreamsBuilderTest {
STREAM_OPERATION_NAME + "-merge");
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKStream() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
- streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
+ streamOne.join(streamTwo,
+ (value1, value2) -> value1,
+ JoinWindows.of(Duration.ofHours(1)),
+ StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
@@ -821,12 +824,17 @@ public class StreamsBuilderTest {
STREAM_OPERATION_NAME + "-merge");
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldUseGeneratedNameForJoinOperationBetweenKStreamAndKStream() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
- streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName(STREAM_OPERATION_NAME));
+ streamOne.join(streamTwo,
+ (value1, value2) -> value1,
+ JoinWindows.of(Duration.ofHours(1)),
+ StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
+ .withName(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
@@ -920,7 +928,7 @@ public class StreamsBuilderTest {
@Test
public void shouldUseSpecifiedNameForProcessOperation() {
builder.stream(STREAM_TOPIC)
- .process(new MockProcessorSupplier<>(), Named.as("test-processor"));
+ .process(new MockApiProcessorSupplier<>(), Named.as("test-processor"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index fa2b3b3..90dc9e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -42,7 +42,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
@@ -67,7 +67,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNotNull;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
@@ -96,7 +95,7 @@ public class GlobalKTableIntegrationTest {
private String streamTopic;
private GlobalKTable<Long, String> globalTable;
private KStream<String, Long> stream;
- private MockProcessorSupplier<String, String> supplier;
+ private MockApiProcessorSupplier<String, String, Void, Void> supplier;
@Rule
public TestName testName = new TestName();
@@ -119,7 +118,7 @@ public class GlobalKTableIntegrationTest {
.withValueSerde(Serdes.String()));
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
stream = builder.stream(streamTopic, stringLongConsumed);
- supplier = new MockProcessorSupplier<>();
+ supplier = new MockApiProcessorSupplier<>();
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 6d89325..98dec87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -30,7 +30,9 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
@@ -107,7 +109,6 @@ public class GlobalThreadShutDownOrderTest {
@Rule
public TestName testName = new TestName();
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void before() throws Exception {
builder = new StreamsBuilder();
@@ -196,8 +197,7 @@ public class GlobalThreadShutDownOrderTest {
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private class GlobalStoreProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, Long> {
+ private class GlobalStoreProcessor implements Processor<String, Long, Void, Void> {
private KeyValueStore<String, Long> store;
private final String storeName;
@@ -207,14 +207,12 @@ public class GlobalThreadShutDownOrderTest {
}
@Override
- @SuppressWarnings("unchecked")
- public void init(final ProcessorContext context) {
- super.init(context);
- store = (KeyValueStore<String, Long>) context.getStateStore(storeName);
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(storeName);
}
@Override
- public void process(final String key, final Long value) {
+ public void process(final Record<String, Long> record) {
firstRecordProcessed = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index aabf6e2..1c22bfc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;
+import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,7 +29,6 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -65,12 +65,10 @@ import java.util.function.Supplier;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class RangeQueryIntegrationTest {
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+ private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
private static final Properties STREAMS_CONFIG = new Properties();
private static final String APP_ID = "range-query-integration-test";
private static final Long COMMIT_INTERVAL = 100L;
@@ -78,14 +76,14 @@ public class RangeQueryIntegrationTest {
private static final String TABLE_NAME = "mytable";
private static final int DATA_SIZE = 5;
- private enum StoreType { InMemory, RocksDB, Timed };
- private StoreType storeType;
- private boolean enableLogging;
- private boolean enableCaching;
- private boolean forward;
- private KafkaStreams kafkaStreams;
+ private enum StoreType { InMemory, RocksDB, Timed }
+
+ private final StoreType storeType;
+ private final boolean enableLogging;
+ private final boolean enableCaching;
+ private final boolean forward;
+ private final LinkedList<KeyValue<String, String>> records;
- private LinkedList<KeyValue<String, String>> records;
private String low;
private String high;
private String middle;
@@ -176,13 +174,11 @@ public class RangeQueryIntegrationTest {
@Test
public void testStoreConfig() throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
- final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, TABLE_NAME, enableLogging, enableCaching);
- final KTable<String, String> table = builder.table(inputStream, stateStoreConfig);
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, enableLogging, enableCaching);
+ builder.table(inputStream, stateStoreConfig);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) {
- final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams);
-
- IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60));
writeInputData();
@@ -221,32 +217,29 @@ public class RangeQueryIntegrationTest {
}
private List<KeyValue<String, String>> filterList(final KeyValueIterator<String, String> iterator, final String from, final String to) {
- final Predicate<KeyValue<String, String>> pred = new Predicate<KeyValue<String, String>>() {
- @Override
- public boolean test(final KeyValue<String, String> elem) {
- if (from != null && elem.key.compareTo(from) < 0) {
- return false;
- }
- if (to != null && elem.key.compareTo(to) > 0) {
- return false;
- }
- return elem != null;
+ final Predicate<KeyValue<String, String>> predicate = elem -> {
+ if (from != null && elem.key.compareTo(from) < 0) {
+ return false;
+ }
+ if (to != null && elem.key.compareTo(to) > 0) {
+ return false;
}
+ return elem != null;
};
- return Utils.toList(iterator, pred);
+ return Utils.toList(iterator, predicate);
}
private void testRange(final String name, final ReadOnlyKeyValueStore<String, String> store, final String from, final String to, final boolean forward) {
try (final KeyValueIterator<String, String> resultIterator = forward ? store.range(from, to) : store.reverseRange(from, to);
- final KeyValueIterator<String, String> expectedIterator = forward ? store.all() : store.reverseAll();) {
+ final KeyValueIterator<String, String> expectedIterator = forward ? store.all() : store.reverseAll()) {
final List<KeyValue<String, String>> result = Utils.toList(resultIterator);
final List<KeyValue<String, String>> expected = filterList(expectedIterator, from, to);
- assertThat(result, is(expected));
+ assertThat(name, result, is(expected));
}
}
- private Materialized<String, String, KeyValueStore<Bytes, byte[]>> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) {
+ private Materialized<String, String, KeyValueStore<Bytes, byte[]>> getStoreConfig(final StoreType type, final boolean cachingEnabled, final boolean loggingEnabled) {
final Supplier<KeyValueBytesStoreSupplier> createStore = () -> {
if (type == StoreType.InMemory) {
return Stores.inMemoryKeyValueStore(TABLE_NAME);
@@ -270,7 +263,7 @@ public class RangeQueryIntegrationTest {
stateStoreConfig.withCachingDisabled();
}
if (loggingEnabled) {
- stateStoreConfig.withLoggingEnabled(new HashMap<String, String>());
+ stateStoreConfig.withLoggingEnabled(new HashMap<>());
} else {
stateStoreConfig.withLoggingDisabled();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 1bff14d..2c0e180 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -40,9 +40,11 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.TrackingS
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -291,10 +293,8 @@ public class RestoreIntegrationTest {
assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException {
-
IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
asList(KeyValue.pair(1, 1),
KeyValue.pair(2, 2),
@@ -430,9 +430,7 @@ public class RestoreIntegrationTest {
}
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- public static class KeyValueStoreProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
-
+ public static class KeyValueStoreProcessor implements Processor<Integer, Integer, Void, Void> {
private final String topic;
private final CountDownLatch processorLatch;
@@ -443,22 +441,18 @@ public class RestoreIntegrationTest {
this.processorLatch = processorLatch;
}
- @SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
- this.store = (KeyValueStore<Integer, Integer>) context.getStateStore(topic);
+ public void init(final ProcessorContext<Void, Void> context) {
+ this.store = context.getStateStore(topic);
}
@Override
- public void process(final Integer key, final Integer value) {
- if (key != null) {
- store.put(key, value);
+ public void process(final Record<Integer, Integer> record) {
+ if (record.key() != null) {
+ store.put(record.key(), record.value());
processorLatch.countDown();
}
}
-
- @Override
- public void close() { }
}
private void createStateForRestoration(final String changelogTopic, final int startingOffset) {
@@ -499,5 +493,4 @@ public class RestoreIntegrationTest {
consumer.commitSync();
consumer.close();
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 9c6085f..6c6fc5d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -27,7 +27,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -59,7 +61,6 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Category({IntegrationTest.class})
public class StoreUpgradeIntegrationTest {
private static final String STORE_NAME = "store";
@@ -953,124 +954,112 @@ public class StoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class KeyValueProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
+ private static class KeyValueProcessor implements Processor<Integer, Integer, Void, Void> {
private KeyValueStore<Integer, Long> store;
- @SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
- store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME);
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(STORE_NAME);
}
@Override
- public void process(final Integer key, final Integer value) {
+ public void process(final Record<Integer, Integer> record) {
final long newCount;
- final Long oldCount = store.get(key);
+ final Long oldCount = store.get(record.key());
if (oldCount != null) {
newCount = oldCount + 1L;
} else {
newCount = 1L;
}
- store.put(key, newCount);
+ store.put(record.key(), newCount);
}
@Override
public void close() {}
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class TimestampedKeyValueProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
- private ProcessorContext context;
+ private static class TimestampedKeyValueProcessor implements Processor<Integer, Integer, Void, Void> {
private TimestampedKeyValueStore<Integer, Long> store;
- @SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
- this.context = context;
- store = (TimestampedKeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME);
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(STORE_NAME);
}
@Override
- public void process(final Integer key, final Integer value) {
+ public void process(final Record<Integer, Integer> record) {
final long newCount;
- final ValueAndTimestamp<Long> oldCountWithTimestamp = store.get(key);
+ final ValueAndTimestamp<Long> oldCountWithTimestamp = store.get(record.key());
final long newTimestamp;
if (oldCountWithTimestamp == null) {
newCount = 1L;
- newTimestamp = context.timestamp();
+ newTimestamp = record.timestamp();
} else {
newCount = oldCountWithTimestamp.value() + 1L;
- newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp());
+ newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), record.timestamp());
}
- store.put(key, ValueAndTimestamp.make(newCount, newTimestamp));
+ store.put(record.key(), ValueAndTimestamp.make(newCount, newTimestamp));
}
@Override
public void close() {}
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class WindowedProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
+ private static class WindowedProcessor implements Processor<Integer, Integer, Void, Void> {
private WindowStore<Integer, Long> store;
- @SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
- store = (WindowStore<Integer, Long>) context.getStateStore(STORE_NAME);
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(STORE_NAME);
}
@Override
- public void process(final Integer key, final Integer value) {
+ public void process(final Record<Integer, Integer> record) {
final long newCount;
- final Long oldCount = store.fetch(key, key < 10 ? 0L : 100000L);
+ final Long oldCount = store.fetch(record.key(), record.key() < 10 ? 0L : 100000L);
if (oldCount != null) {
newCount = oldCount + 1L;
} else {
newCount = 1L;
}
- store.put(key, newCount, key < 10 ? 0L : 100000L);
+ store.put(record.key(), newCount, record.key() < 10 ? 0L : 100000L);
}
@Override
public void close() {}
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class TimestampedWindowedProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
- private ProcessorContext context;
+ private static class TimestampedWindowedProcessor implements Processor<Integer, Integer, Void, Void> {
private TimestampedWindowStore<Integer, Long> store;
- @SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context) {
- this.context = context;
- store = (TimestampedWindowStore<Integer, Long>) context.getStateStore(STORE_NAME);
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(STORE_NAME);
}
@Override
- public void process(final Integer key, final Integer value) {
+ public void process(final Record<Integer, Integer> record) {
final long newCount;
- final ValueAndTimestamp<Long> oldCountWithTimestamp = store.fetch(key, key < 10 ? 0L : 100000L);
+ final ValueAndTimestamp<Long> oldCountWithTimestamp = store.fetch(record.key(), record.key() < 10 ? 0L : 100000L);
final long newTimestamp;
if (oldCountWithTimestamp == null) {
newCount = 1L;
- newTimestamp = context.timestamp();
+ newTimestamp = record.timestamp();
} else {
newCount = oldCountWithTimestamp.value() + 1L;
- newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp());
+ newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), record.timestamp());
}
- store.put(key, ValueAndTimestamp.make(newCount, newTimestamp), key < 10 ? 0L : 100000L);
+ store.put(record.key(), ValueAndTimestamp.make(newCount, newTimestamp), record.key() < 10 ? 0L : 100000L);
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 6f35d12..2aec4ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -28,6 +28,8 @@ import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -83,7 +85,6 @@ public class TaskMetadataIntegrationTest {
private AtomicBoolean process;
private AtomicBoolean commit;
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setup() {
final String testId = safeUniqueTestName(getClass(), testName);
@@ -185,18 +186,15 @@ public class TaskMetadataIntegrationTest {
timestamp);
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private class PauseProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+ private class PauseProcessor extends ContextualProcessor<String, String, Void, Void> {
@Override
- public void process(final String key, final String value) {
+ public void process(final Record<String, String> record) {
while (!process.get()) {
try {
wait(100);
} catch (final InterruptedException e) {
-
}
}
- context().forward(key, value);
if (commit.get()) {
context().commit();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index cad978c..b6d3206 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.junit.Test;
import java.time.Duration;
@@ -38,7 +40,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-@SuppressWarnings("deprecation")
public class RepartitionTopicNamingTest {
private final KeyValueMapper<String, String, String> kvMapper = (k, v) -> k + v;
@@ -104,8 +105,8 @@ public class RepartitionTopicNamingTest {
.selectKey((k, v) -> k)
.groupByKey(Grouped.as("grouping"));
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count().toStream().to("output-one");
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
final String topologyString = builder.build().describe().toString();
assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
@@ -119,11 +120,11 @@ public class RepartitionTopicNamingTest {
.selectKey((k, v) -> k)
.groupByKey(Grouped.as("grouping"));
- final TimeWindowedKStream<String, String> timeWindowedKStream = kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L)));
+ final TimeWindowedKStream<String, String> timeWindowedKStream = kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)));
timeWindowedKStream.count().toStream().to("output-one");
timeWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two");
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
final String topologyString = builder.build().describe().toString();
assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
@@ -137,11 +138,11 @@ public class RepartitionTopicNamingTest {
.selectKey((k, v) -> k)
.groupByKey(Grouped.as("grouping"));
- final SessionWindowedKStream<String, String> sessionWindowedKStream = kGroupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(10L)));
+ final SessionWindowedKStream<String, String> sessionWindowedKStream = kGroupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)));
sessionWindowedKStream.count().toStream().to("output-one");
sessionWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two");
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
final String topologyString = builder.build().describe().toString();
assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
@@ -166,8 +167,8 @@ public class RepartitionTopicNamingTest {
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
.selectKey((k, v) -> k)
.groupByKey();
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count().toStream().to("output-one");
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
final String topologyString = builder.build().describe().toString();
assertThat(2, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
}
@@ -188,8 +189,8 @@ public class RepartitionTopicNamingTest {
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
.selectKey((k, v) -> k)
.groupByKey(Grouped.as("grouping"));
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count();
final Properties properties = new Properties();
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final Topology topology = builder.build(properties);
@@ -207,10 +208,10 @@ public class RepartitionTopicNamingTest {
final KStream<String, String> stream3 = builder.<String, String>stream("topic3").selectKey((k, v) -> k);
final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2,
- JoinWindows.of(Duration.ofMillis(30L)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(30L)),
StreamJoined.<String, String, String>as("join-store").withName("join-repartition"));
- joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)),
+ joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(30L)),
StreamJoined.<String, String, String>as("join-store").withName("join-repartition"));
builder.build();
@@ -228,8 +229,8 @@ public class RepartitionTopicNamingTest {
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
.selectKey((k, v) -> k)
.groupByKey(Grouped.as("grouping"));
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
- kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
+ kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count();
builder.build(properties);
}
@@ -363,15 +364,25 @@ public class RepartitionTopicNamingTest {
if (isGroupByKey) {
if (otherOperations) {
- selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+ selectKeyStream.filter((k, v) -> true)
+ .mapValues(v -> v)
+ .groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)))
+ .count();
} else {
- selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+ selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)))
+ .count();
}
} else {
if (otherOperations) {
- selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+ selectKeyStream.filter((k, v) -> true)
+ .mapValues(v -> v)
+ .groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName))
+ .count();
} else {
- selectKeyStream.groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+ selectKeyStream.groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName))
+ .count();
}
}
@@ -388,15 +399,27 @@ public class RepartitionTopicNamingTest {
final String groupedSessionWindowRepartitionTopicName = "session-window-grouping";
if (isGroupByKey) {
if (otherOperations) {
- selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+ selectKeyStream.filter((k, v) -> true)
+ .mapValues(v -> v)
+ .groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+ .count();
} else {
- selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+ selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+ .count();
}
} else {
if (otherOperations) {
- selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+ selectKeyStream.filter((k, v) -> true)
+ .mapValues(v -> v)
+ .groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+ .count();
} else {
- selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+ selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+ .count();
}
}
@@ -446,7 +469,7 @@ public class RepartitionTopicNamingTest {
}
final String joinRepartitionTopicName = "my-join";
- updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(1000L)),
+ updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1000L)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName(joinRepartitionTopicName));
return builder.build().describe().toString();
@@ -463,7 +486,6 @@ public class RepartitionTopicNamingTest {
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private Topology buildTopology(final String optimizationConfig) {
final Initializer<Integer> initializer = () -> 0;
final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
@@ -495,7 +517,7 @@ public class RepartitionTopicNamingTest {
mappedStream.filter((k, v) -> k.equals("A"))
.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
- JoinWindows.of(Duration.ofMillis(5000L)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(5000L)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long()).withStoreName(fourthRepartitionTopicName).withName(fourthRepartitionTopicName))
.to(JOINED_TOPIC);
@@ -506,8 +528,7 @@ public class RepartitionTopicNamingTest {
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class SimpleProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+ private static class SimpleProcessor implements Processor<String, String, Void, Void> {
final List<String> valueList;
@@ -516,8 +537,8 @@ public class RepartitionTopicNamingTest {
}
@Override
- public void process(final String key, final String value) {
- valueList.add(value);
+ public void process(final Record<String, String> record) {
+ valueList.add(record.value());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index ae7ef8f..9920e4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -19,6 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.streams.StreamsBuilder;
@@ -30,7 +35,6 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
import java.util.Random;
@@ -41,7 +45,6 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class AbstractStreamTest {
@Test
@@ -69,12 +72,11 @@ public class AbstractStreamTest {
verify(valueTransformerWithKeySupplier);
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testShouldBeExtensible() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
final String topicName = "topic";
final ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())));
@@ -108,7 +110,7 @@ public class AbstractStreamTest {
}
}
- private static class ExtendedKStreamDummy<K, V> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
+ private static class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V, K, V> {
private final Random rand;
@@ -117,16 +119,16 @@ public class AbstractStreamTest {
}
@Override
- public org.apache.kafka.streams.processor.Processor<K, V> get() {
+ public Processor<K, V, K, V> get() {
return new ExtendedKStreamDummyProcessor();
}
- private class ExtendedKStreamDummyProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+ private class ExtendedKStreamDummyProcessor extends ContextualProcessor<K, V, K, V> {
@Override
- public void process(final K key, final V value) {
+ public void process(final Record<K, V> record) {
// flip a coin and filter
if (rand.nextBoolean()) {
- context().forward(key, value);
+ context().forward(record);
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index e4d95e0..cb0218b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -38,7 +38,6 @@ import java.util.Properties;
import static org.junit.Assert.assertEquals;
-
public class GlobalKTableJoinsTest {
private final StreamsBuilder builder = new StreamsBuilder();
@@ -56,10 +55,9 @@ public class GlobalKTableJoinsTest {
keyValueMapper = (key, value) -> value;
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void shouldLeftJoinWithStream() {
- final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream
.leftJoin(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
.process(supplier);
@@ -72,10 +70,9 @@ public class GlobalKTableJoinsTest {
verifyJoin(expected, supplier);
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void shouldInnerJoinWithStream() {
- final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream
.join(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
.process(supplier);
@@ -88,7 +85,7 @@ public class GlobalKTableJoinsTest {
}
private void verifyJoin(final Map<String, ValueAndTimestamp<String>> expected,
- final MockProcessorSupplier<String, String> supplier) {
+ final MockApiProcessorSupplier<String, String, Void, Void> supplier) {
final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 333884e..5025e7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -40,8 +40,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -61,7 +61,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KGroupedStreamImplTest {
private static final String TOPIC = "topic";
@@ -95,7 +94,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotHaveNullReducerWithWindowedReduce() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
.reduce(null, Materialized.as("store")));
}
@@ -107,7 +106,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
assertThrows(TopologyException.class, () -> groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
}
@@ -132,14 +131,14 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotHaveNullInitializerOnWindowedAggregate() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
}
@Test
public void shouldNotHaveNullAdderOnWindowedAggregate() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
}
@@ -151,14 +150,14 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
assertThrows(TopologyException.class, () -> groupedStream
- .windowedBy(TimeWindows.of(ofMillis(10)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
}
@Test
public void shouldNotHaveNullReducerWithSlidingWindowedReduce() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.reduce(null, Materialized.as("store")));
}
@@ -170,36 +169,36 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotHaveInvalidStoreNameWithSlidingWindowedReduce() {
assertThrows(TopologyException.class, () -> groupedStream
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
}
@Test
public void shouldNotHaveNullInitializerOnSlidingWindowedAggregate() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
}
@Test
public void shouldNotHaveNullAdderOnSlidingWindowedAggregate() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
}
@Test
public void shouldNotHaveInvalidStoreNameOnSlidingWindowedAggregate() {
assertThrows(TopologyException.class, () -> groupedStream
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
}
@Test
public void shouldCountSlidingWindows() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
groupedStream
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
.count(Materialized.as("aggregate-by-key-windowed"))
.toStream()
.process(supplier);
@@ -209,9 +208,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountSlidingWindowsWithInternalStoreName() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
groupedStream
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
.count()
.toStream()
.process(supplier);
@@ -219,7 +218,7 @@ public class KGroupedStreamImplTest {
doCountSlidingWindows(supplier);
}
- private void doCountSlidingWindows(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
+ private void doCountSlidingWindows(final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -314,7 +313,7 @@ public class KGroupedStreamImplTest {
)));
}
- private void doAggregateSessionWindows(final MockProcessorSupplier<Windowed<String>, Integer> supplier) {
+ private void doAggregateSessionWindows(final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -340,9 +339,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAggregateSessionWindows() {
- final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KTable<Windowed<String>, Integer> table = groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.aggregate(
() -> 0,
(aggKey, value, aggregate) -> aggregate + 1,
@@ -358,9 +357,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAggregateSessionWindowsWithInternalStoreName() {
- final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KTable<Windowed<String>, Integer> table = groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.aggregate(
() -> 0,
(aggKey, value, aggregate) -> aggregate + 1,
@@ -371,8 +370,7 @@ public class KGroupedStreamImplTest {
doAggregateSessionWindows(supplier);
}
- private void doCountSessionWindows(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
-
+ private void doCountSessionWindows(final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -398,9 +396,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountSessionWindows() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KTable<Windowed<String>, Long> table = groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.count(Materialized.as("session-store"));
table.toStream().process(supplier);
doCountSessionWindows(supplier);
@@ -409,16 +407,16 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountSessionWindowsWithInternalStoreName() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KTable<Windowed<String>, Long> table = groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.count();
table.toStream().process(supplier);
doCountSessionWindows(supplier);
assertNull(table.queryableStoreName());
}
- private void doReduceSessionWindows(final MockProcessorSupplier<Windowed<String>, String> supplier) {
+ private void doReduceSessionWindows(final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -444,9 +442,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldReduceSessionWindows() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KTable<Windowed<String>, String> table = groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.reduce((value1, value2) -> value1 + ":" + value2, Materialized.as("session-store"));
table.toStream().process(supplier);
doReduceSessionWindows(supplier);
@@ -455,9 +453,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldReduceSessionWindowsWithInternalStoreName() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KTable<Windowed<String>, String> table = groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.reduce((value1, value2) -> value1 + ":" + value2);
table.toStream().process(supplier);
doReduceSessionWindows(supplier);
@@ -467,7 +465,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.reduce(null, Materialized.as("store")));
}
@@ -479,7 +477,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
assertThrows(TopologyException.class, () -> groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME))
);
}
@@ -487,7 +485,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.reduce(null, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null))
);
}
@@ -495,7 +493,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.aggregate(null, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
);
}
@@ -503,7 +501,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
assertThrows(NullPointerException.class, () -> groupedStream.
- windowedBy(SessionWindows.with(ofMillis(30)))
+ windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.aggregate(MockInitializer.STRING_INIT, null, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
);
}
@@ -511,7 +509,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
assertThrows(NullPointerException.class, () -> groupedStream
- .windowedBy(SessionWindows.with(ofMillis(30)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as("storeName"))
);
}
@@ -524,7 +522,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
groupedStream
- .windowedBy(SessionWindows.with(ofMillis(10)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(10)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -535,7 +533,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
assertThrows(TopologyException.class, () -> groupedStream
- .windowedBy(SessionWindows.with(ofMillis(10)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(10)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as(INVALID_STORE_NAME))
);
}
@@ -677,7 +675,7 @@ public class KGroupedStreamImplTest {
@Test
public void shouldAggregateWithDefaultSerdes() {
- final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
groupedStream
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER)
.toStream()
@@ -710,7 +708,7 @@ public class KGroupedStreamImplTest {
inputTopic.pipeInput("3", (String) null);
}
- private void doCountWindowed(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
+ private void doCountWindowed(final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -745,9 +743,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountWindowed() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
groupedStream
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), ofMillis(100L)))
.count(Materialized.as("aggregate-by-key-windowed"))
.toStream()
.process(supplier);
@@ -757,9 +755,9 @@ public class KGroupedStreamImplTest {
@Test
public void shouldCountWindowedWithInternalStoreName() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
groupedStream
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), ofMillis(100L)))
.count()
.toStream()
.process(supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index bc3f461..c4e9de6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -40,14 +40,13 @@ public class KStreamFilterTest {
private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFilter() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
final KStream<Integer, String> stream;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filter(isMultipleOfThree).process(supplier);
@@ -62,14 +61,13 @@ public class KStreamFilterTest {
assertEquals(2, supplier.theCapturedProcessor().processed().size());
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFilterNot() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
final KStream<Integer, String> stream;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filterNot(isMultipleOfThree).process(supplier);
@@ -93,6 +91,5 @@ public class KStreamFilterTest {
.filter(numberKeyPredicate)
.filterNot(numberKeyPredicate)
.to("nirvana");
-
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 1f763a0..69bb32c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -45,7 +45,6 @@ import static org.junit.Assert.assertThrows;
public class KStreamFlatMapTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMap() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -63,9 +62,7 @@ public class KStreamFlatMapTest {
final int[] expectedKeys = {0, 1, 2, 3};
final KStream<Integer, String> stream;
- final MockProcessorSupplier<String, String> supplier;
-
- supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.flatMap(mapper).process(supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index c1930e5..413b628 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -41,7 +41,6 @@ public class KStreamFlatMapValuesTest {
private final String topicName = "topic";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMapValues() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -57,7 +56,7 @@ public class KStreamFlatMapValuesTest {
final int[] expectedKeys = {0, 1, 2, 3};
final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.flatMapValues(mapper).process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -78,7 +77,6 @@ public class KStreamFlatMapValuesTest {
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMapValuesWithKeys() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -94,7 +92,7 @@ public class KStreamFlatMapValuesTest {
final int[] expectedKeys = {0, 1, 2, 3};
final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.flatMapValues(mapper).process(supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index 8082255..68fef1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -20,7 +20,9 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFlatTransformProcessor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Before;
@@ -31,14 +33,13 @@ import java.util.Collections;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransformTest extends EasyMockSupport {
private Number inputKey;
private Number inputValue;
private Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
- private ProcessorContext context;
+ private InternalProcessorContext<Integer, Integer> context;
private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
@@ -47,7 +48,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
inputKey = 1;
inputValue = 10;
transformer = mock(Transformer.class);
- context = strictMock(ProcessorContext.class);
+ context = strictMock(InternalProcessorContext.class);
processor = new KStreamFlatTransformProcessor<>(transformer);
}
@@ -72,11 +73,11 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
EasyMock.expect(transformer.transform(inputKey, inputValue)).andReturn(outputRecords);
for (final KeyValue<Integer, Integer> outputRecord : outputRecords) {
- context.forward(outputRecord.key, outputRecord.value);
+ context.forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
}
replayAll();
- processor.process(inputKey, inputValue);
+ processor.process(new Record<>(inputKey, inputValue, 0L));
verifyAll();
}
@@ -87,10 +88,10 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
EasyMock.reset(transformer);
EasyMock.expect(transformer.transform(inputKey, inputValue))
- .andReturn(Collections.<KeyValue<Integer, Integer>>emptyList());
+ .andReturn(Collections.emptyList());
replayAll();
- processor.process(inputKey, inputValue);
+ processor.process(new Record<>(inputKey, inputValue, 0L));
verifyAll();
}
@@ -104,7 +105,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
.andReturn(null);
replayAll();
- processor.process(inputKey, inputValue);
+ processor.process(new Record<>(inputKey, inputValue, 0L));
verifyAll();
}
@@ -129,7 +130,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
replayAll();
- final org.apache.kafka.streams.processor.Processor<Number, Number> processor = processorSupplier.get();
+ final Processor<Number, Number, Integer, Integer> processor = processorSupplier.get();
verifyAll();
assertTrue(processor instanceof KStreamFlatTransformProcessor);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
index fd64604..fcb75e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -24,21 +24,22 @@ import java.util.Collections;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransformValuesTest extends EasyMockSupport {
private Integer inputKey;
private Integer inputValue;
private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
- private ProcessorContext context;
+ private InternalProcessorContext<Integer, String> context;
private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
@@ -47,7 +48,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
inputKey = 1;
inputValue = 10;
valueTransformer = mock(ValueTransformerWithKey.class);
- context = strictMock(ProcessorContext.class);
+ context = strictMock(InternalProcessorContext.class);
processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
}
@@ -72,11 +73,11 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(outputValues);
for (final String outputValue : outputValues) {
- context.forward(inputKey, outputValue);
+ context.forward(new Record<>(inputKey, outputValue, 0L));
}
replayAll();
- processor.process(inputKey, inputValue);
+ processor.process(new Record<>(inputKey, inputValue, 0L));
verifyAll();
}
@@ -86,10 +87,10 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
processor.init(context);
EasyMock.reset(valueTransformer);
- EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.<String>emptyList());
+ EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.emptyList());
replayAll();
- processor.process(inputKey, inputValue);
+ processor.process(new Record<>(inputKey, inputValue, 0L));
verifyAll();
}
@@ -102,7 +103,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(null);
replayAll();
- processor.process(inputKey, inputValue);
+ processor.process(new Record<>(inputKey, inputValue, 0L));
verifyAll();
}
@@ -127,7 +128,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer);
replayAll();
- final org.apache.kafka.streams.processor.Processor<Integer, Integer> processor = processorSupplier.get();
+ final Processor<Integer, Integer, Integer, String> processor = processorSupplier.get();
verifyAll();
assertTrue(processor instanceof KStreamFlatTransformValuesProcessor);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index fda87dc..8c65656 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
@@ -52,10 +52,9 @@ public class KStreamGlobalKTableJoinTest {
private final int[] expectedKeys = {0, 1, 2, 3};
private TopologyTestDriver driver;
- private MockProcessor<Integer, String> processor;
+ private MockApiProcessor<Integer, String, Void, Void> processor;
private StreamsBuilder builder;
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {
@@ -64,7 +63,7 @@ public class KStreamGlobalKTableJoinTest {
final GlobalKTable<String, String> table; // value of stream optionally contains key of table
final KeyValueMapper<Integer, String, String> keyMapper;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 9268997..fe6c1d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
@@ -51,11 +51,10 @@ public class KStreamGlobalKTableLeftJoinTest {
private final String globalTableTopic = "globalTableTopic";
private final int[] expectedKeys = {0, 1, 2, 3};
- private MockProcessor<Integer, String> processor;
+ private MockApiProcessor<Integer, String, Void, Void> processor;
private TopologyTestDriver driver;
private StreamsBuilder builder;
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {
@@ -64,7 +63,7 @@ public class KStreamGlobalKTableLeftJoinTest {
final GlobalKTable<String, String> table; // value of stream optionally contains key of table
final KeyValueMapper<Integer, String, String> keyMapper;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 9753453..0af5bd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -62,8 +62,9 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
@@ -100,12 +101,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamImplTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
- private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ private final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
private final TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier =
() -> new Transformer<String, String, KeyValue<String, String>>() {
@Override
@@ -1527,7 +1526,7 @@ public class KStreamImplTest {
inputTopic.pipeInput("a", "v2");
inputTopic.pipeInput("b", "v1");
}
- final List<MockProcessor<String, String>> mockProcessors = processorSupplier.capturedProcessors(2);
+ final List<MockApiProcessor<String, String, Void, Void>> mockProcessors = processorSupplier.capturedProcessors(2);
assertThat(mockProcessors.get(0).processed(), equalTo(asList(new KeyValueTimestamp<>("a", "v1", 0),
new KeyValueTimestamp<>("a", "v2", 0))));
assertThat(mockProcessors.get(1).processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 37c9e49..2ffa048 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -37,8 +37,8 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -63,15 +63,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- private final JoinWindows joinWindows = JoinWindows.of(ofMillis(50)).grace(Duration.ofMillis(50));
+ private final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(50), Duration.ofMillis(50));
private final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
private final String errorMessagePrefix = "Window settings mismatch. WindowBytesStoreSupplier settings";
@@ -85,7 +83,7 @@ public class KStreamKStreamJoinTest {
left.join(
right,
Integer::sum,
- JoinWindows.of(ofMillis(100)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
);
@@ -115,8 +113,8 @@ public class KStreamKStreamJoinTest {
final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
- newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one");
- newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to");
+ newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100))).to("out-one");
+ newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100))).to("out-to");
assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString());
}
@@ -130,8 +128,8 @@ public class KStreamKStreamJoinTest {
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
- newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
- newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
+ newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
+ newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
final Topology topology = builder.build(props);
System.out.println(topology.describe().toString());
assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString());
@@ -140,7 +138,7 @@ public class KStreamKStreamJoinTest {
@Test
public void shouldDisableLoggingOnStreamJoined() {
- final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+ final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
.withStoreName("store")
@@ -167,7 +165,7 @@ public class KStreamKStreamJoinTest {
@Test
public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
- final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+ final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
.withStoreName("store")
@@ -295,8 +293,8 @@ public class KStreamKStreamJoinTest {
}
@Test
- public void shouldExceptionWhenJoinStoresDontHaveUniqueNames() {
- final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ofMillis(50L));
+ public void shouldExceptionWhenJoinStoresDoNotHaveUniqueNames() {
+ final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), Duration.ofMillis(50L));
final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
final WindowBytesStoreSupplier thisStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
final WindowBytesStoreSupplier otherStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
@@ -310,7 +308,7 @@ public class KStreamKStreamJoinTest {
@Test
public void shouldJoinWithCustomStoreSuppliers() {
- final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L));
+ final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore(
"in-memory-join-store",
@@ -344,7 +342,7 @@ public class KStreamKStreamJoinTest {
final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KStream<String, Integer> joinedStream;
joinedStream = left.join(
@@ -361,7 +359,7 @@ public class KStreamKStreamJoinTest {
driver.createInputTopic("left", new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<String, Integer> inputTopicRight =
driver.createInputTopic("right", new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<String, Integer> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<String, Integer, Void, Void> processor = supplier.theCapturedProcessor();
inputTopicLeft.pipeInput("A", 1, 1L);
inputTopicLeft.pipeInput("B", 1, 2L);
@@ -385,13 +383,13 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.of(ofMillis(100L)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
);
joined.process(supplier);
@@ -407,7 +405,7 @@ public class KStreamKStreamJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// push two items to the primary stream; the other window is empty
// w1 = {}
@@ -507,7 +505,7 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -529,7 +527,7 @@ public class KStreamKStreamJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// push two items to the primary stream; the other window is empty; this should not produce items yet
// w1 = {}
@@ -629,14 +627,14 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.of(ofMillis(100L)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
);
joined.process(supplier);
@@ -652,7 +650,7 @@ public class KStreamKStreamJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
long time = 0L;
// push two items to the primary stream; the other window is empty; this should produce no items
@@ -1192,14 +1190,14 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.of(ofMillis(0)).after(ofMillis(100)).grace(ofMillis(0)),
+ JoinWindows.ofTimeDifferenceAndGrace(ofMillis(0), ofMillis(0)).after(ofMillis(100)),
StreamJoined.with(Serdes.Integer(),
Serdes.String(),
Serdes.String())
@@ -1217,7 +1215,7 @@ public class KStreamKStreamJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
long time = 1000L;
// push four items with increasing timestamps to the primary stream; the other window is empty; this should produce no items
@@ -1460,7 +1458,7 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -1468,7 +1466,7 @@ public class KStreamKStreamJoinTest {
joined = stream1.join(
stream2,
MockValueJoiner.TOSTRING_JOINER,
- JoinWindows.of(ofMillis(0)).before(ofMillis(100)),
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).before(ofMillis(100)),
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
);
joined.process(supplier);
@@ -1484,7 +1482,7 @@ public class KStreamKStreamJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
long time = 1000L;
// push four items with increasing timestamps to the primary stream; the other window is empty; this should produce no items
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 4e2b6d8..2a29915 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -30,8 +30,8 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -47,8 +47,6 @@ import java.util.Set;
import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamLeftJoinTest {
private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -57,6 +55,7 @@ public class KStreamKStreamLeftJoinTest {
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ @SuppressWarnings("deprecation")
@Test
public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -66,7 +65,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -83,7 +82,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// Only 2 window stores should be available
assertEquals(2, driver.getAllStateStores().size());
@@ -116,6 +115,7 @@ public class KStreamKStreamLeftJoinTest {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -123,7 +123,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -140,7 +140,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// Only 2 window stores should be available
assertEquals(2, driver.getAllStateStores().size());
@@ -165,7 +165,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -182,7 +182,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// verifies non-joined duplicates are emitted when window has closed
inputTopic1.pipeInput(0, "A0", 0L);
@@ -221,7 +221,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -238,7 +238,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -276,7 +276,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -293,7 +293,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -331,7 +331,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -348,7 +348,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -383,7 +383,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -400,7 +400,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -462,7 +462,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -485,7 +485,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// 2 window stores + 1 shared window store should be available
assertEquals(3, driver.getAllStateStores().size());
@@ -573,7 +573,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -590,7 +590,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// push two items to the primary stream; the other window is empty; this should not produce any item yet
// w1 = {}
@@ -624,7 +624,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -647,7 +647,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// push two items to the primary stream; the other window is empty; this should not produce items because window has not closed
// w1 = {}
@@ -696,7 +696,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -719,7 +719,7 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long time = 0L;
// push two items to the primary stream; the other window is empty; this should not produce any items
@@ -751,7 +751,7 @@ public class KStreamKStreamLeftJoinTest {
private void testUpperWindowBound(final int[] expectedKeys,
final TopologyTestDriver driver,
- final MockProcessor<Integer, String> processor) {
+ final MockApiProcessor<Integer, String, Void, Void> processor) {
long time;
final TestInputTopic<Integer, String> inputTopic1 =
@@ -895,7 +895,7 @@ public class KStreamKStreamLeftJoinTest {
private void testLowerWindowBound(final int[] expectedKeys,
final TopologyTestDriver driver,
- final MockProcessor<Integer, String> processor) {
+ final MockApiProcessor<Integer, String, Void, Void> processor) {
long time;
final TestInputTopic<Integer, String> inputTopic1 = driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2d9e320..0fcbfeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -30,8 +30,8 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -47,14 +47,13 @@ import java.util.Set;
import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamOuterJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ @SuppressWarnings("deprecation")
@Test
public void testOuterJoinDuplicatesWithFixDisabledOldApi() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -62,7 +61,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -79,7 +78,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// Only 2 window stores should be available
assertEquals(2, driver.getAllStateStores().size());
@@ -106,7 +105,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -123,7 +122,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// verifies non-joined duplicates are emitted when window has closed
inputTopic1.pipeInput(0, "A0", 0L);
@@ -173,7 +172,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -190,7 +189,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -228,7 +227,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -245,7 +244,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -283,7 +282,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -300,7 +299,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -338,7 +337,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -355,7 +354,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long windowStart = 0L;
@@ -393,7 +392,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -410,7 +409,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// push two items to the primary stream; the other window is empty; this should not produce any item yet
// w1 = {}
@@ -443,7 +442,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -466,7 +465,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// push one item to the primary stream; and one item in other stream; this should not produce items because there are no joins
// and window has not ended
@@ -542,7 +541,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -565,7 +564,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
// 2 window stores + 1 shared window store should be available
assertEquals(3, driver.getAllStateStores().size());
@@ -655,7 +654,7 @@ public class KStreamKStreamOuterJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -678,7 +677,7 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
final long time = 0L;
// push two items to the primary stream; the other window is empty; this should not produce items because window has not closed
@@ -711,7 +710,7 @@ public class KStreamKStreamOuterJoinTest {
private void testUpperWindowBound(final int[] expectedKeys,
final TopologyTestDriver driver,
- final MockProcessor<Integer, String> processor) {
+ final MockApiProcessor<Integer, String, Void, Void> processor) {
long time;
final TestInputTopic<Integer, String> inputTopic1 =
@@ -858,7 +857,7 @@ public class KStreamKStreamOuterJoinTest {
private void testLowerWindowBound(final int[] expectedKeys,
final TopologyTestDriver driver,
- final MockProcessor<Integer, String> processor) {
+ final MockApiProcessor<Integer, String, Void, Void> processor) {
long time;
final TestInputTopic<Integer, String> inputTopic1 = driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 3c515b3..d162acf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -44,8 +44,8 @@ import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
@@ -61,12 +61,11 @@ public class KStreamKTableJoinTest {
private TestInputTopic<Integer, String> inputTableTopic;
private final int[] expectedKeys = {0, 1, 2, 3};
- private MockProcessor<Integer, String> processor;
+ private MockApiProcessor<Integer, String, Void, Void> processor;
private TopologyTestDriver driver;
private StreamsBuilder builder;
- private final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {
builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index d9f227c..d68f9bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
@@ -56,10 +56,9 @@ public class KStreamKTableLeftJoinTest {
private final int[] expectedKeys = {0, 1, 2, 3};
private TopologyTestDriver driver;
- private MockProcessor<Integer, String> processor;
+ private MockApiProcessor<Integer, String, Void, Void> processor;
private StreamsBuilder builder;
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {
builder = new StreamsBuilder();
@@ -67,7 +66,7 @@ public class KStreamKTableLeftJoinTest {
final KStream<Integer, String> stream;
final KTable<Integer, String> table;
- final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
stream = builder.stream(streamTopic, consumed);
table = builder.table(tableTopic, consumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index bf5ff26..c171b87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -43,14 +43,13 @@ import static org.junit.Assert.assertThrows;
public class KStreamMapTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMap() {
final StreamsBuilder builder = new StreamsBuilder();
final String topicName = "topic";
final int[] expectedKeys = new int[] {0, 1, 2, 3};
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.map((key, value) -> KeyValue.pair(value, key)).process(supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index fe1e4a3..f3a773b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -36,10 +36,9 @@ import static org.junit.Assert.assertArrayEquals;
public class KStreamMapValuesTest {
private final String topicName = "topic";
- private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
+ private final MockApiProcessorSupplier<Integer, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMapValues() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -64,7 +63,6 @@ public class KStreamMapValuesTest {
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMapValuesWithKeys() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index d8e70f9..b1e9d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -41,7 +41,6 @@ public class KStreamSelectKeyTest {
private final String topicName = "topic_key_select";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testSelectKey() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -58,7 +57,7 @@ public class KStreamSelectKeyTest {
final KStream<String, Integer> stream =
builder.stream(topicName, Consumed.with(Serdes.String(), Serdes.Integer()));
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.selectKey((key, value) -> keyMap.get(value)).process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -73,7 +72,6 @@ public class KStreamSelectKeyTest {
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i));
}
-
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 38c3fa7..8e8115f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -47,9 +47,9 @@ import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
@@ -83,7 +83,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@RunWith(Parameterized.class)
public class KStreamSlidingWindowAggregateTest {
@@ -101,7 +100,6 @@ public class KStreamSlidingWindowAggregateTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
- @SuppressWarnings("unchecked")
@Test
public void testAggregateSmallInput() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -114,13 +112,13 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.as(storeSupplier)
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
@@ -158,7 +156,6 @@ public class KStreamSlidingWindowAggregateTest {
assertEquals(expected, actual);
}
- @SuppressWarnings("unchecked")
@Test
public void testReduceSmallInput() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -171,12 +168,12 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
.reduce(
MockReducer.STRING_ADDER,
Materialized.as(storeSupplier)
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
@@ -227,14 +224,14 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.as(storeSupplier)
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -390,7 +387,7 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table1 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@@ -399,14 +396,14 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.as(storeSupplier2)
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table1.toStream().process(supplier);
table2.toStream().process(supplier);
@@ -422,7 +419,7 @@ public class KStreamSlidingWindowAggregateTest {
inputTopic1.pipeInput("B", "2", 11L);
inputTopic1.pipeInput("C", "3", 12L);
- final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+ final List<MockApiProcessor<Windowed<String>, String, Void, Void>> processors = supplier.capturedProcessors(3);
processors.get(0).checkAndClearProcessResult(
// left windows created by the first set of records to table 1
@@ -492,7 +489,6 @@ public class KStreamSlidingWindowAggregateTest {
}
}
- @SuppressWarnings("unchecked")
@Test
public void testEarlyRecordsSmallInput() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -501,13 +497,13 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(50), ofMillis(200)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(50), ofMillis(200)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -526,7 +522,7 @@ public class KStreamSlidingWindowAggregateTest {
for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
final Windowed<String> window = entry.key();
final Long start = window.window().start();
- final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
actual.replace(start, valueAndTimestamp);
}
@@ -543,7 +539,6 @@ public class KStreamSlidingWindowAggregateTest {
assertEquals(expected, actual);
}
- @SuppressWarnings("unchecked")
@Test
public void testEarlyRecordsRepeatedInput() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -552,13 +547,13 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(5), ofMillis(20)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(5), ofMillis(20)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -578,7 +573,7 @@ public class KStreamSlidingWindowAggregateTest {
for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
final Windowed<String> window = entry.key();
final Long start = window.window().start();
- final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
actual.replace(start, valueAndTimestamp);
}
@@ -603,14 +598,14 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.as(storeSupplier)
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -793,7 +788,6 @@ public class KStreamSlidingWindowAggregateTest {
}
}
- @SuppressWarnings("unchecked")
@Test
public void testAggregateRandomInput() {
@@ -821,7 +815,7 @@ public class KStreamSlidingWindowAggregateTest {
},
Materialized.as(storeSupplier)
);
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table.toStream().process(supplier);
final long seed = new Random().nextLong();
final Random shuffle = new Random(seed);
@@ -856,8 +850,8 @@ public class KStreamSlidingWindowAggregateTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic1 =
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
- for (int i = 0; i < input.size(); i++) {
- inputTopic1.pipeInput("A", input.get(i).value(), input.get(i).timestamp());
+ for (final ValueAndTimestamp<String> i : input) {
+ inputTopic1.pipeInput("A", i.value(), i.timestamp());
}
}
@@ -866,7 +860,7 @@ public class KStreamSlidingWindowAggregateTest {
for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
final Windowed<String> window = entry.key();
final Long start = window.window().start();
- final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
if (results.putIfAbsent(start, valueAndTimestamp) != null) {
results.replace(start, valueAndTimestamp);
}
@@ -878,12 +872,8 @@ public class KStreamSlidingWindowAggregateTest {
t
);
} catch (final Throwable t) {
- final StringBuilder sb =
- new StringBuilder()
- .append("Exception in randomized scenario. Reproduce with seed: ")
- .append(seed)
- .append(".");
- throw new AssertionError(sb.toString(), t);
+ final String msg = "Exception in randomized scenario. Reproduce with seed: " + seed + ".";
+ throw new AssertionError(msg, t);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 983e52d..0f2975f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -27,9 +27,10 @@ import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
import org.apache.kafka.test.StreamsTestUtils;
@@ -46,13 +47,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
@RunWith(EasyMockRunner.class)
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamTransformValuesTest {
private final String topicName = "topic";
private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
@Mock(MockType.NICE)
- private ProcessorContext context;
+ private InternalProcessorContext context;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
@@ -64,7 +64,7 @@ public class KStreamTransformValuesTest {
private int total = 0;
@Override
- public void init(final ProcessorContext context) { }
+ public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { }
@Override
public Integer transform(final Number value) {
@@ -107,7 +107,7 @@ public class KStreamTransformValuesTest {
private int total = 0;
@Override
- public void init(final ProcessorContext context) { }
+ public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { }
@Override
public Integer transform(final Integer readOnlyKey, final Number value) {
@@ -145,7 +145,7 @@ public class KStreamTransformValuesTest {
public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
final NoOpValueTransformerWithKeySupplier<String, String> transformer = new NoOpValueTransformerWithKeySupplier<>();
final KStreamTransformValues<String, String, String> transformValues = new KStreamTransformValues<>(transformer);
- final org.apache.kafka.streams.processor.Processor<String, String> processor = transformValues.get();
+ final Processor<String, String, String, String> processor = transformValues.get();
processor.init(context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index df40c94..750f7f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -38,9 +38,9 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
import org.junit.Test;
@@ -62,12 +62,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("deprecation")
public class KStreamWindowAggregateTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testAggBasic() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -76,10 +74,10 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(100)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -145,7 +143,6 @@ public class KStreamWindowAggregateTest {
);
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testJoin() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -155,16 +152,16 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table1 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(100)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
table1.toStream().process(supplier);
final KTable<Windowed<String>, String> table2 = builder
.stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(100)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()));
table2.toStream().process(supplier);
@@ -181,7 +178,7 @@ public class KStreamWindowAggregateTest {
inputTopic1.pipeInput("D", "4", 3L);
inputTopic1.pipeInput("A", "1", 9L);
- final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+ final List<MockApiProcessor<Windowed<String>, String, Void, Void>> processors = supplier.capturedProcessors(3);
processors.get(0).checkAndClearProcessResult(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 0),
@@ -274,7 +271,7 @@ public class KStreamWindowAggregateTest {
builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.toStringInstance("+"),
@@ -299,7 +296,7 @@ public class KStreamWindowAggregateTest {
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(90)).advanceBy(ofMillis(5)))
.aggregate(
() -> "",
MockAggregator.toStringInstance("+"),
@@ -362,7 +359,7 @@ public class KStreamWindowAggregateTest {
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(90L)).advanceBy(ofMillis(10)))
.aggregate(
() -> "",
MockAggregator.toStringInstance("+"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index d3ed6b5..0974ed6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -36,8 +36,6 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -66,12 +64,11 @@ public class KTableFilterTest {
private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private void doTestKTable(final StreamsBuilder builder,
final KTable<String, Integer> table2,
final KTable<String, Integer> table3,
final String topic) {
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
table3.toStream().process(supplier);
@@ -86,7 +83,7 @@ public class KTableFilterTest {
inputTopic.pipeInput("B", null, 15L);
}
- final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+ final List<MockApiProcessor<String, Integer, Void, Void>> processors = supplier.capturedProcessors(2);
processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", null, 10),
new KeyValueTimestamp<>("B", 2, 5),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 462423a..3dd8a2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -45,10 +45,10 @@ import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
@@ -83,7 +83,6 @@ public class KTableImplTest {
table = new StreamsBuilder().table("test");
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -93,7 +92,7 @@ public class KTableImplTest {
final KTable<String, String> table1 = builder.table(topic1, consumed);
- final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Object, Void, Void> supplier = new MockApiProcessorSupplier<>();
table1.toStream().process(supplier);
final KTable<String, Integer> table2 = table1.mapValues(s -> Integer.valueOf(s));
@@ -117,7 +116,7 @@ public class KTableImplTest {
inputTopic.pipeInput("A", "06", 8L);
}
- final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+ final List<MockApiProcessor<String, Object, Void, Void>> processors = supplier.capturedProcessors(4);
assertEquals(asList(
new KeyValueTimestamp<>("A", "01", 5),
new KeyValueTimestamp<>("B", "02", 100),
@@ -152,7 +151,6 @@ public class KTableImplTest {
processors.get(3).processed());
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMaterializedKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -162,7 +160,7 @@ public class KTableImplTest {
final KTable<String, String> table1 = builder.table(topic1, consumed, Materialized.as("fred"));
- final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Object, Void, Void> supplier = new MockApiProcessorSupplier<>();
table1.toStream().process(supplier);
final KTable<String, Integer> table2 = table1.mapValues(s -> Integer.valueOf(s));
@@ -186,7 +184,7 @@ public class KTableImplTest {
inputTopic.pipeInput("A", "06", 8L);
}
- final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+ final List<MockApiProcessor<String, Object, Void, Void>> processors = supplier.capturedProcessors(4);
assertEquals(asList(
new KeyValueTimestamp<>("A", "01", 5),
new KeyValueTimestamp<>("B", "02", 100),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index e761968..360c37e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -39,7 +39,6 @@ import static org.junit.Assert.assertEquals;
public class KTableMapKeysTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMapKeysConvertingToStream() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -60,7 +59,7 @@ public class KTableMapKeysTest {
final int[] originalKeys = new int[] {1, 2, 3};
final String[] values = new String[] {"V_ONE", "V_TWO", "V_THREE"};
- final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
convertedStream.process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index ff6e1b9..a7858b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@@ -56,7 +55,7 @@ public class KTableMapValuesTest {
private void doTestKTable(final StreamsBuilder builder,
final String topic1,
- final MockProcessorSupplier<String, Integer> supplier) {
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic1 =
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
@@ -71,7 +70,6 @@ public class KTableMapValuesTest {
}
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -80,13 +78,12 @@ public class KTableMapValuesTest {
final KTable<String, String> table1 = builder.table(topic1, consumed);
final KTable<String, Integer> table2 = table1.mapValues(value -> value.charAt(0) - 48);
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
doTestKTable(builder, topic1, supplier);
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testQueryableKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -99,7 +96,7 @@ public class KTableMapValuesTest {
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName")
.withValueSerde(Serdes.Integer()));
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
table2.toStream().process(supplier);
doTestKTable(builder, topic1, supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 83b8ac8..68c005a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Ignore;
import org.junit.Test;
@@ -61,7 +60,6 @@ public class KTableSourceTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -69,7 +67,7 @@ public class KTableSourceTest {
final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(Serdes.String(), Serdes.Integer()));
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
table1.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index f26fbc6..8c7d179 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -37,8 +37,8 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -54,7 +54,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SessionWindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
@@ -66,7 +65,7 @@ public class SessionWindowedKStreamImplTest {
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SessionWindows.with(ofMillis(500)));
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)));
}
@Test
@@ -81,7 +80,7 @@ public class SessionWindowedKStreamImplTest {
}
private void shouldCountSessionWindowed() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.count()
.toStream()
.process(supplier);
@@ -107,7 +106,7 @@ public class SessionWindowedKStreamImplTest {
@Test
public void shouldReduceWindowed() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.reduce(MockReducer.STRING_ADDER)
.toStream()
.process(supplier);
@@ -133,7 +132,7 @@ public class SessionWindowedKStreamImplTest {
@Test
public void shouldAggregateSessionWindowed() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
sessionMerger,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
index e4a965e..64df966 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
@@ -37,8 +37,8 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -54,7 +54,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SlidingWindowedKStreamImplTest {
private static final String TOPIC = "input";
@@ -67,12 +66,12 @@ public class SlidingWindowedKStreamImplTest {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
windowedStream = stream.
groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L)));
+ .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L)));
}
@Test
public void shouldCountSlidingWindows() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.count()
.toStream()
@@ -113,7 +112,7 @@ public class SlidingWindowedKStreamImplTest {
@Test
public void shouldReduceSlidingWindows() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.reduce(MockReducer.STRING_ADDER)
.toStream()
@@ -154,7 +153,7 @@ public class SlidingWindowedKStreamImplTest {
@Test
public void shouldAggregateSlidingWindows() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.aggregate(
MockInitializer.STRING_INIT,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index aef79c8..f5f3ff8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -35,8 +35,8 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -52,7 +52,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class TimeWindowedKStreamImplTest {
private static final String TOPIC = "input";
private final StreamsBuilder builder = new StreamsBuilder();
@@ -64,12 +63,12 @@ public class TimeWindowedKStreamImplTest {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
windowedStream = stream.
groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(500L)));
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
}
@Test
public void shouldCountWindowed() {
- final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.count()
.toStream()
@@ -94,7 +93,7 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldReduceWindowed() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.reduce(MockReducer.STRING_ADDER)
.toStream()
@@ -119,7 +118,7 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldAggregateWindowed() {
- final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
windowedStream
.aggregate(
MockInitializer.STRING_INIT,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index a898338..f8a7073 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.Test;
@@ -31,7 +33,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class GraphGraceSearchUtilTest {
@Test
public void shouldThrowOnNull() {
@@ -50,12 +51,12 @@ public class GraphGraceSearchUtilTest {
final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>(
"stateful",
new ProcessorParameters<>(
- () -> new org.apache.kafka.streams.processor.Processor<String, Long>() {
+ () -> new Processor<String, Long, String, Long>() {
@Override
- public void init(final ProcessorContext context) {}
+ public void init(final ProcessorContext<String, Long> context) {}
@Override
- public void process(final String key, final Long value) {}
+ public void process(final Record<String, Long> record) {}
@Override
public void close() {}
@@ -78,7 +79,7 @@ public class GraphGraceSearchUtilTest {
@Test
public void shouldExtractGraceFromKStreamWindowAggregateNode() {
- final TimeWindows windows = TimeWindows.of(ofMillis(10L)).grace(ofMillis(1234L));
+ final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(
@@ -99,7 +100,7 @@ public class GraphGraceSearchUtilTest {
@Test
public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
- final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
+ final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
"asdf",
@@ -122,7 +123,7 @@ public class GraphGraceSearchUtilTest {
@Test
public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
- final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
+ final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
@@ -134,12 +135,12 @@ public class GraphGraceSearchUtilTest {
final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
"stateful",
new ProcessorParameters<>(
- () -> new org.apache.kafka.streams.processor.Processor<String, Long>() {
+ () -> new Processor<String, Long, String, Long>() {
@Override
- public void init(final ProcessorContext context) {}
+ public void init(final ProcessorContext<String, Long> context) {}
@Override
- public void process(final String key, final Long value) {}
+ public void process(final Record<String, Long> record) {}
@Override
public void close() {}
@@ -159,7 +160,7 @@ public class GraphGraceSearchUtilTest {
@Test
public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
- final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
+ final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(
@@ -191,7 +192,7 @@ public class GraphGraceSearchUtilTest {
"asdf",
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
- SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L)),
+ SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)),
"asdf",
null,
null,
@@ -206,7 +207,7 @@ public class GraphGraceSearchUtilTest {
"asdf",
new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
- TimeWindows.of(ofMillis(10L)).grace(ofMillis(4321L)),
+ TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
"asdf",
null,
null
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
index 99be7f8..987784b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
@@ -17,19 +17,21 @@
package org.apache.kafka.streams.kstream.internals.graph;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class TableProcessorNodeTest {
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class TestProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+ private static class TestProcessor implements Processor<String, String, String, String> {
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+ public void init(final ProcessorContext<String, String> context) {
}
@Override
- public void process(final String key, final String value) {
+ public void process(final Record<String, String> record) {
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 87a4c68..dfa3f9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -27,7 +27,8 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -49,29 +50,28 @@ import static org.junit.Assert.assertTrue;
public class ProcessorNodeTest {
- @SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
- final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet());
assertThrows(StreamsException.class, () -> node.init(null));
}
- @SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
- final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet());
assertThrows(StreamsException.class, () -> node.init(null));
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class ExceptionalProcessor implements org.apache.kafka.streams.processor.Processor<Object, Object> {
+ private static class ExceptionalProcessor implements Processor<Object, Object, Object, Object> {
@Override
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext<Object, Object> context) {
throw new RuntimeException();
}
@Override
- public void process(final Object key, final Object value) {
+ public void process(final Record<Object, Object> record) {
throw new RuntimeException();
}
@@ -81,21 +81,9 @@ public class ProcessorNodeTest {
}
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class NoOpProcessor implements org.apache.kafka.streams.processor.Processor<Object, Object> {
+ private static class NoOpProcessor implements Processor<Object, Object, Object, Object> {
@Override
- public void init(final ProcessorContext context) {
-
- }
-
- @Override
- public void process(final Object key, final Object value) {
-
- }
-
- @Override
- public void close() {
-
+ public void process(final Record<Object, Object> record) {
}
}
@@ -105,7 +93,8 @@ public class ProcessorNodeTest {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
- final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("name", new NoOpProcessor(), Collections.emptySet());
node.init(context);
final String threadId = Thread.currentThread().getName();
@@ -140,9 +129,7 @@ public class ProcessorNodeTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
- .flatMapValues(value -> {
- return Collections.singletonList("");
- });
+ .flatMapValues(value -> Collections.singletonList(""));
final Topology topology = builder.build();
final Properties config = new Properties();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class);
@@ -164,9 +151,7 @@ public class ProcessorNodeTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
- .flatMapValues(value -> {
- return Collections.singletonList("");
- });
+ .flatMapValues(value -> Collections.singletonList(""));
final Topology topology = builder.build();
final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology));
@@ -178,11 +163,11 @@ public class ProcessorNodeTest {
private static class ClassCastProcessor extends ExceptionalProcessor {
@Override
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext<Object, Object> context) {
}
@Override
- public void process(final Object key, final Object value) {
+ public void process(final Record<Object, Object> record) {
throw new ClassCastException("Incompatible types simulation exception.");
}
}
@@ -193,7 +178,8 @@ public class ProcessorNodeTest {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
- final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
node.init(context);
final StreamsException se = assertThrows(
StreamsException.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index cde573a..fe1bb3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.test.MockProcessorNode;
@@ -29,12 +28,7 @@ public class PunctuationQueueTest {
private final MockProcessorNode<String, String, ?, ?> node = new MockProcessorNode<>();
private final PunctuationQueue queue = new PunctuationQueue();
- private final Punctuator punctuator = new Punctuator() {
- @Override
- public void punctuate(final long timestamp) {
- node.mockProcessor.punctuatedStreamTime().add(timestamp);
- }
- };
+ private final Punctuator punctuator = timestamp -> node.mockProcessor.punctuatedStreamTime().add(timestamp);
@Test
public void testPunctuationInterval() {
@@ -43,12 +37,7 @@ public class PunctuationQueueTest {
queue.schedule(sched);
- final ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
- @Override
- public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
- punctuator.punctuate(timestamp);
- }
- };
+ final ProcessorNodePunctuator processorNodePunctuator = (node, timestamp, type, punctuator) -> punctuator.punctuate(timestamp);
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
@@ -82,12 +71,8 @@ public class PunctuationQueueTest {
queue.schedule(sched);
- final ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
- @Override
- public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
- punctuator.punctuate(timestamp);
- }
- };
+ final ProcessorNodePunctuator processorNodePunctuator =
+ (node, timestamp, type, punctuator) -> punctuator.punctuate(timestamp);
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
@@ -121,13 +106,10 @@ public class PunctuationQueueTest {
final Cancellable cancellable = queue.schedule(sched);
- final ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
- @Override
- public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
- punctuator.punctuate(timestamp);
- // simulate scheduler cancelled from within punctuator
- cancellable.cancel();
- }
+ final ProcessorNodePunctuator processorNodePunctuator = (node, timestamp, type, punctuator) -> {
+ punctuator.punctuate(timestamp);
+ // simulate scheduler cancelled from within punctuator
+ cancellable.cancel();
};
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
@@ -139,18 +121,4 @@ public class PunctuationQueueTest {
queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
}
-
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class TestProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
-
- @Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public void process(final String key, final String value) {}
-
- @Override
- public void close() {}
- }
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index eb817cc..60aeb7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -44,6 +44,8 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
@@ -134,10 +136,7 @@ public class RepartitionOptimizingTest {
runTest(StreamsConfig.NO_OPTIMIZATION, FOUR_REPARTITION_TOPICS);
}
-
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) {
-
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> sourceStream =
@@ -258,8 +257,7 @@ public class RepartitionOptimizingTest {
return keyValueList;
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private static class SimpleProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+ private static class SimpleProcessor implements Processor<String, String, Void, Void> {
final List<String> valueList;
@@ -268,8 +266,8 @@ public class RepartitionOptimizingTest {
}
@Override
- public void process(final String key, final String value) {
- valueList.add(value);
+ public void process(final Record<String, String> record) {
+ valueList.add(record.value());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b29961..bebd417 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -64,7 +64,11 @@ import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -1715,20 +1719,19 @@ public class StreamThreadTest {
}
@Test
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public void shouldPunctuateActiveTask() {
final List<Long> punctuatedStreamTime = new ArrayList<>();
final List<Long> punctuatedWallClockTime = new ArrayList<>();
- final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> punctuateProcessor =
- () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
+ final ProcessorSupplier<Object, Object, Void, Void> punctuateProcessor =
+ () -> new ContextualProcessor<Object, Object, Void, Void>() {
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+ public void init(final ProcessorContext<Void, Void> context) {
context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
}
@Override
- public void process(final Object key, final Object value) {}
+ public void process(final Record<Object, Object> record) {}
};
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
@@ -1786,7 +1789,6 @@ public class StreamThreadTest {
}
@Test
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
final org.apache.kafka.streams.kstream.TransformerSupplier<Object, Object, KeyValue<Object, Object>> punctuateProcessor =
() -> new org.apache.kafka.streams.kstream.Transformer<Object, Object, KeyValue<Object, Object>>() {
@@ -1806,13 +1808,8 @@ public class StreamThreadTest {
};
final List<Long> peekedContextTime = new ArrayList<>();
- final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> peekProcessor =
- () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
- @Override
- public void process(final Object key, final Object value) {
- peekedContextTime.add(context.timestamp());
- }
- };
+ final ProcessorSupplier<Object, Object, Void, Void> peekProcessor =
+ () -> (Processor<Object, Object, Void, Void>) record -> peekedContextTime.add(record.timestamp());
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
.transform(punctuateProcessor)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index b9b24da..3b1aa44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -99,7 +99,6 @@ public class EosTestClient extends SmokeTestUtil {
}
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private KafkaStreams createKafkaStreams(final Properties props) {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 86f7583..936a41a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -50,7 +50,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
-@SuppressWarnings("deprecation")
public class SmokeTestClient extends SmokeTestUtil {
private final String name;
@@ -163,7 +162,6 @@ public class SmokeTestClient extends SmokeTestUtil {
return fullProps;
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public Topology getTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
@@ -177,7 +175,7 @@ public class SmokeTestClient extends SmokeTestUtil {
final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
final KTable<Windowed<String>, Integer> minAggregation = groupedData
- .windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofMinutes(1)))
.aggregate(
() -> Integer.MAX_VALUE,
(aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
@@ -197,7 +195,7 @@ public class SmokeTestClient extends SmokeTestUtil {
.to("min", Produced.with(stringSerde, intSerde));
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
- .windowedBy(TimeWindows.of(Duration.ofSeconds(2)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(30)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
.reduce((l, r) -> l + r);
streamify(smallWindowSum, "sws-raw");
@@ -212,7 +210,7 @@ public class SmokeTestClient extends SmokeTestUtil {
// max
groupedData
- .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
.aggregate(
() -> Integer.MIN_VALUE,
(aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
@@ -229,7 +227,7 @@ public class SmokeTestClient extends SmokeTestUtil {
// sum
groupedData
- .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
.aggregate(
() -> 0L,
(aggKey, value, aggregate) -> (long) value + aggregate,
@@ -244,7 +242,7 @@ public class SmokeTestClient extends SmokeTestUtil {
// cnt
groupedData
- .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
.count(Materialized.as("uwin-cnt"))
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 1222a81..e0c45d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -23,68 +23,67 @@ import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
import java.time.Instant;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SmokeTestUtil {
final static int END = Integer.MAX_VALUE;
- static org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
+ static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
return printProcessorSupplier(topic, "");
}
- static org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final String name) {
- return new org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object>() {
+ static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
+ return () -> new ContextualProcessor<Object, Object, Void, Void>() {
+ private int numRecordsProcessed = 0;
+ private long smallestOffset = Long.MAX_VALUE;
+ private long largestOffset = Long.MIN_VALUE;
+
@Override
- public org.apache.kafka.streams.processor.Processor<Object, Object> get() {
- return new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
- private int numRecordsProcessed = 0;
- private long smallestOffset = Long.MAX_VALUE;
- private long largestOffset = Long.MIN_VALUE;
-
- @Override
- public void init(final ProcessorContext context) {
- super.init(context);
- System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
- System.out.flush();
- numRecordsProcessed = 0;
- smallestOffset = Long.MAX_VALUE;
- largestOffset = Long.MIN_VALUE;
- }
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+ System.out.flush();
+ numRecordsProcessed = 0;
+ smallestOffset = Long.MAX_VALUE;
+ largestOffset = Long.MIN_VALUE;
+ }
- @Override
- public void process(final Object key, final Object value) {
- numRecordsProcessed++;
- if (numRecordsProcessed % 100 == 0) {
- System.out.printf("%s: %s%n", name, Instant.now());
- System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
- }
-
- if (smallestOffset > context().offset()) {
- smallestOffset = context().offset();
- }
- if (largestOffset < context().offset()) {
- largestOffset = context().offset();
- }
+ @Override
+ public void process(final Record<Object, Object> record) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.printf("%s: %s%n", name, Instant.now());
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
+ }
+
+ if (context().recordMetadata().isPresent()) {
+ if (smallestOffset > context().recordMetadata().get().offset()) {
+ smallestOffset = context().recordMetadata().get().offset();
}
-
- @Override
- public void close() {
- System.out.printf("Close processor for task %s%n", context().taskId());
- System.out.println("processed " + numRecordsProcessed + " records");
- final long processed;
- if (largestOffset >= smallestOffset) {
- processed = 1L + largestOffset - smallestOffset;
- } else {
- processed = 0L;
- }
- System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
- System.out.flush();
+ if (largestOffset < context().recordMetadata().get().offset()) {
+ largestOffset = context().recordMetadata().get().offset();
}
- };
+ }
+ }
+
+ @Override
+ public void close() {
+ System.out.printf("Close processor for task %s%n", context().taskId());
+ System.out.println("processed " + numRecordsProcessed + " records");
+ final long processed;
+ if (largestOffset >= smallestOffset) {
+ processed = 1L + largestOffset - smallestOffset;
+ } else {
+ processed = 0L;
+ }
+ System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+ System.out.flush();
}
};
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index f20a532..94f623e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -89,7 +89,6 @@ public class StreamsUpgradeTest {
});
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public static KafkaStreams buildStreams(final Properties streamsProperties) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Void, Void> dataStream = builder.stream("data");