You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/09/22 19:27:07 UTC
[kafka] branch trunk updated: KAFKA-10544: Migrate KTable aggregate
and reduce (#11316)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d15969a KAFKA-10544: Migrate KTable aggregate and reduce (#11316)
d15969a is described below
commit d15969af286f4f239c0a972f4135ef429381ccd6
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Wed Sep 22 20:25:41 2021 +0100
KAFKA-10544: Migrate KTable aggregate and reduce (#11316)
As part of the migration of KStream/KTable operations to the new Processor API https://issues.apache.org/jira/browse/KAFKA-8410, this PR includes the migration of KTable aggregate/reduce operations.
Reviewers: John Roesler <vv...@apache.org>
---
.../kstream/internals/KGroupedTableImpl.java | 45 +++++++--------
.../streams/kstream/internals/KTableAggregate.java | 65 +++++++++++-----------
.../streams/kstream/internals/KTableReduce.java | 39 ++++++-------
.../kstream/internals/KGroupedTableImplTest.java | 13 ++---
.../kstream/internals/KTableAggregateTest.java | 13 ++---
.../kstream/internals/KTableReduceTest.java | 14 ++---
6 files changed, 92 insertions(+), 97 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 111d169..da35dac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRep
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
@@ -67,11 +68,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
this.userProvidedRepartitionTopicName = groupedInternal.name();
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private <T> KTable<K, T> doAggregate(final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier,
+ private <VAgg> KTable<K, VAgg> doAggregate(final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier,
final NamedInternal named,
final String functionName,
- final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
+ final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {
final String sinkName = named.suffixWithOrElseGet("-sink", builder, KStreamImpl.SINK_NAME);
final String sourceName = named.suffixWithOrElseGet("-source", builder, KStreamImpl.SOURCE_NAME);
@@ -145,8 +145,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valueSerde);
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
+ final ProcessorSupplier<K, Change<V>, K, Change<V>> aggregateSupplier = new KTableReduce<>(
materializedInternal.storeName(),
adder,
subtractor);
@@ -177,8 +176,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
materializedInternal.withValueSerde(Serdes.Long());
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
+ final ProcessorSupplier<K, Change<V>, K, Change<Long>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
countInitializer,
countAdder,
@@ -198,33 +196,32 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
}
@Override
- public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
- final Aggregator<? super K, ? super V, VR> adder,
- final Aggregator<? super K, ? super V, VR> subtractor,
- final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+ public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+ final Aggregator<? super K, ? super V, VAgg> adder,
+ final Aggregator<? super K, ? super V, VAgg> subtractor,
+ final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {
return aggregate(initializer, adder, subtractor, NamedInternal.empty(), materialized);
}
@Override
- public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
- final Aggregator<? super K, ? super V, VR> adder,
- final Aggregator<? super K, ? super V, VR> subtractor,
+ public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+ final Aggregator<? super K, ? super V, VAgg> adder,
+ final Aggregator<? super K, ? super V, VAgg> subtractor,
final Named named,
- final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+ final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+ final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
+ final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
initializer,
adder,
@@ -233,17 +230,17 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
}
@Override
- public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
- final Aggregator<? super K, ? super V, T> adder,
- final Aggregator<? super K, ? super V, T> subtractor,
+ public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+ final Aggregator<? super K, ? super V, VAgg> adder,
+ final Aggregator<? super K, ? super V, VAgg> subtractor,
final Named named) {
return aggregate(initializer, adder, subtractor, named, Materialized.with(keySerde, null));
}
@Override
- public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
- final Aggregator<? super K, ? super V, T> adder,
- final Aggregator<? super K, ? super V, T> subtractor) {
+ public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+ final Aggregator<? super K, ? super V, VAgg> adder,
+ final Aggregator<? super K, ? super V, VAgg> subtractor) {
return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 3ff4dfa..410a49b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -19,25 +19,27 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
+public class KTableAggregate<KIn, VIn, VAgg> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VAgg> {
private final String storeName;
- private final Initializer<T> initializer;
- private final Aggregator<? super K, ? super V, T> add;
- private final Aggregator<? super K, ? super V, T> remove;
+ private final Initializer<VAgg> initializer;
+ private final Aggregator<? super KIn, ? super VIn, VAgg> add;
+ private final Aggregator<? super KIn, ? super VIn, VAgg> remove;
private boolean sendOldValues = false;
KTableAggregate(final String storeName,
- final Initializer<T> initializer,
- final Aggregator<? super K, ? super V, T> add,
- final Aggregator<? super K, ? super V, T> remove) {
+ final Initializer<VAgg> initializer,
+ final Aggregator<? super KIn, ? super VIn, VAgg> add,
+ final Aggregator<? super KIn, ? super VIn, VAgg> remove) {
this.storeName = storeName;
this.initializer = initializer;
this.add = add;
@@ -52,19 +54,18 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
}
@Override
- public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
+ public Processor<KIn, Change<VIn>, KIn, Change<VAgg>> get() {
return new KTableAggregateProcessor();
}
- private class KTableAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
- private TimestampedKeyValueStore<K, T> store;
- private TimestampedTupleForwarder<K, T> tupleForwarder;
+ private class KTableAggregateProcessor implements Processor<KIn, Change<VIn>, KIn, Change<VAgg>> {
+ private TimestampedKeyValueStore<KIn, VAgg> store;
+ private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
- super.init(context);
- store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
+ public void init(final ProcessorContext<KIn, Change<VAgg>> context) {
+ store = (TimestampedKeyValueStore<KIn, VAgg>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
@@ -76,52 +77,52 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
* @throws StreamsException if key is null
*/
@Override
- public void process(final K key, final Change<V> value) {
+ public void process(final Record<KIn, Change<VIn>> record) {
// the keys should never be null
- if (key == null) {
+ if (record.key() == null) {
throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
}
- final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
- final T oldAgg = getValueOrNull(oldAggAndTimestamp);
- final T intermediateAgg;
- long newTimestamp = context().timestamp();
+ final ValueAndTimestamp<VAgg> oldAggAndTimestamp = store.get(record.key());
+ final VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
+ final VAgg intermediateAgg;
+ long newTimestamp = record.timestamp();
// first try to remove the old value
- if (value.oldValue != null && oldAgg != null) {
- intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
- newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+ if (record.value().oldValue != null && oldAgg != null) {
+ intermediateAgg = remove.apply(record.key(), record.value().oldValue, oldAgg);
+ newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
} else {
intermediateAgg = oldAgg;
}
// then try to add the new value
- final T newAgg;
- if (value.newValue != null) {
- final T initializedAgg;
+ final VAgg newAgg;
+ if (record.value().newValue != null) {
+ final VAgg initializedAgg;
if (intermediateAgg == null) {
initializedAgg = initializer.apply();
} else {
initializedAgg = intermediateAgg;
}
- newAgg = add.apply(key, value.newValue, initializedAgg);
+ newAgg = add.apply(record.key(), record.value().newValue, initializedAgg);
if (oldAggAndTimestamp != null) {
- newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+ newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
}
} else {
newAgg = intermediateAgg;
}
// update the store with the new value
- store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
- tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
+ store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp));
+ tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? oldAgg : null, newTimestamp);
}
}
@Override
- public KTableValueGetterSupplier<K, T> view() {
+ public KTableValueGetterSupplier<KIn, VAgg> view() {
return new KTableMaterializedValueGetterSupplier<>(storeName);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 72f75ea..d43f6df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -18,13 +18,15 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
+public class KTableReduce<K, V> implements KTableNewProcessorSupplier<K, V, K, V> {
private final String storeName;
private final Reducer<V> addReducer;
@@ -46,19 +48,18 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
}
@Override
- public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
+ public Processor<K, Change<V>, K, Change<V>> get() {
return new KTableReduceProcessor();
}
- private class KTableReduceProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
+ private class KTableReduceProcessor implements Processor<K, Change<V>, K, Change<V>> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
- public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
- super.init(context);
+ public void init(final ProcessorContext<K, Change<V>> context) {
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
@@ -71,42 +72,42 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
* @throws StreamsException if key is null
*/
@Override
- public void process(final K key, final Change<V> value) {
+ public void process(final Record<K, Change<V>> record) {
// the keys should never be null
- if (key == null) {
+ if (record.key() == null) {
throw new StreamsException("Record key for KTable reduce operator with state " + storeName + " should not be null.");
}
- final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
+ final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(record.key());
final V oldAgg = getValueOrNull(oldAggAndTimestamp);
final V intermediateAgg;
long newTimestamp;
// first try to remove the old value
- if (value.oldValue != null && oldAgg != null) {
- intermediateAgg = removeReducer.apply(oldAgg, value.oldValue);
- newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+ if (record.value().oldValue != null && oldAgg != null) {
+ intermediateAgg = removeReducer.apply(oldAgg, record.value().oldValue);
+ newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
} else {
intermediateAgg = oldAgg;
- newTimestamp = context().timestamp();
+ newTimestamp = record.timestamp();
}
// then try to add the new value
final V newAgg;
- if (value.newValue != null) {
+ if (record.value().newValue != null) {
if (intermediateAgg == null) {
- newAgg = value.newValue;
+ newAgg = record.value().newValue;
} else {
- newAgg = addReducer.apply(intermediateAgg, value.newValue);
- newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+ newAgg = addReducer.apply(intermediateAgg, record.value().newValue);
+ newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
}
} else {
newAgg = intermediateAgg;
}
// update the store with the new value
- store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
- tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
+ store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp));
+ tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? oldAgg : null, newTimestamp);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 39cd241..3b5295c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -34,9 +34,9 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -126,9 +126,8 @@ public class KGroupedTableImplTest {
Materialized.as(INVALID_STORE_NAME)));
}
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- private MockProcessorSupplier<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
- final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ private MockApiProcessorSupplier<String, Integer, Void, Void> getReducedResults(final KTable<String, Integer> inputKTable) {
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
inputKTable
.toStream()
.process(supplier);
@@ -173,7 +172,7 @@ public class KGroupedTableImplTest {
MockReducer.INTEGER_SUBTRACTOR,
Materialized.as("reduced"));
- final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
assertEquals(reduced.queryableStoreName(), "reduced");
@@ -195,7 +194,7 @@ public class KGroupedTableImplTest {
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
- final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
assertNull(reduced.queryableStoreName());
@@ -219,7 +218,7 @@ public class KGroupedTableImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer()));
- final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
+ final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
{
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index a61ed12..220734f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -32,10 +32,10 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.util.Properties;
@@ -49,12 +49,11 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.junit.Assert.assertEquals;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableAggregateTest {
private final Serde<String> stringSerde = Serdes.String();
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
private final Grouped<String, String> stringSerialized = Grouped.with(stringSerde, stringSerde);
- private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+ private final MockApiProcessorSupplier<String, Object, Void, Void> supplier = new MockApiProcessorSupplier<>();
private final static Properties CONFIG = mkProperties(mkMap(
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())));
@@ -169,7 +168,7 @@ public class KTableAggregateTest {
private static void testCountHelper(final StreamsBuilder builder,
final String input,
- final MockProcessorSupplier<String, Object> supplier) {
+ final MockApiProcessorSupplier<String, Object, Void, Void> supplier) {
try (
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(), CONFIG, Instant.ofEpochMilli(0L))) {
@@ -229,7 +228,7 @@ public class KTableAggregateTest {
public void testRemoveOldBeforeAddNew() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "count-test-input";
- final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+ final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
builder
.table(input, consumed)
@@ -253,7 +252,7 @@ public class KTableAggregateTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- final MockProcessor<String, String> proc = supplier.theCapturedProcessor();
+ final MockApiProcessor<String, String, Void, Void> proc = supplier.theCapturedProcessor();
inputTopic.pipeInput("11", "A", 10L);
inputTopic.pipeInput("12", "B", 8L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index 0f5cc6b..89aa17a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -31,14 +33,13 @@ import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableReduceTest {
@Test
public void shouldAddAndSubtract() {
final InternalMockProcessorContext<String, Change<Set<String>>> context = new InternalMockProcessorContext<>();
- final org.apache.kafka.streams.processor.Processor<String, Change<Set<String>>> reduceProcessor =
+ final Processor<String, Change<Set<String>>, String, Change<Set<String>>> reduceProcessor =
new KTableReduce<String, Set<String>>(
"myStore",
this::unionNotNullArgs,
@@ -52,14 +53,11 @@ public class KTableReduceTest {
reduceProcessor.init(context);
context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, singleton("myStore")));
- context.setTime(10L);
- reduceProcessor.process("A", new Change<>(singleton("a"), null));
+ reduceProcessor.process(new Record<>("A", new Change<>(singleton("a"), null), 10L));
assertEquals(ValueAndTimestamp.make(singleton("a"), 10L), myStore.get("A"));
- context.setTime(15L);
- reduceProcessor.process("A", new Change<>(singleton("b"), singleton("a")));
+ reduceProcessor.process(new Record<>("A", new Change<>(singleton("b"), singleton("a")), 15L));
assertEquals(ValueAndTimestamp.make(singleton("b"), 15L), myStore.get("A"));
- context.setTime(12L);
- reduceProcessor.process("A", new Change<>(null, singleton("b")));
+ reduceProcessor.process(new Record<>("A", new Change<>(null, singleton("b")), 12L));
assertEquals(ValueAndTimestamp.make(emptySet(), 15L), myStore.get("A"));
}