You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/04 15:42:06 UTC
[kafka] branch trunk updated: MINOR: Removed deprecated schedule
function (#4908)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new af98326 MINOR: Removed deprecated schedule function (#4908)
af98326 is described below
commit af983267be7a2d0f81527f5a348af377f30caee4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri May 4 08:42:01 2018 -0700
MINOR: Removed deprecated schedule function (#4908)
While working on this, I also refactored the MockProcessor out of the MockProcessorSupplier to cleanup the unit test paths.
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../examples/wordcount/WordCountProcessorDemo.java | 4 -
.../kafka/streams/kstream/KGroupedStream.java | 4 +-
.../kafka/streams/kstream/TimeWindowedKStream.java | 4 +-
.../apache/kafka/streams/kstream/Transformer.java | 21 ----
.../kstream/internals/KStreamTransform.java | 9 --
.../kstream/internals/KStreamTransformValues.java | 14 ---
.../kafka/streams/processor/AbstractProcessor.java | 17 +--
.../apache/kafka/streams/processor/Processor.java | 11 --
.../kafka/streams/processor/ProcessorContext.java | 12 ---
.../internals/GlobalProcessorContextImpl.java | 10 --
.../processor/internals/ProcessorContextImpl.java | 11 --
.../streams/processor/internals/ProcessorNode.java | 4 +-
.../processor/internals/StandbyContextImpl.java | 9 --
.../apache/kafka/streams/StreamsBuilderTest.java | 8 +-
.../org/apache/kafka/streams/TopologyTest.java | 4 -
.../streams/integration/EosIntegrationTest.java | 5 -
.../integration/RestoreIntegrationTest.java | 9 +-
.../kafka/streams/kstream/KStreamBuilderTest.java | 8 +-
.../kstream/internals/AbstractStreamTest.java | 6 +-
.../kstream/internals/KStreamBranchTest.java | 16 +--
.../kstream/internals/KStreamFilterTest.java | 16 +--
.../kstream/internals/KStreamFlatMapTest.java | 10 +-
.../internals/KStreamFlatMapValuesTest.java | 12 +--
.../internals/KStreamGlobalKTableJoinTest.java | 12 ++-
.../internals/KStreamGlobalKTableLeftJoinTest.java | 16 +--
.../streams/kstream/internals/KStreamImplTest.java | 27 +++--
.../kstream/internals/KStreamKStreamJoinTest.java | 40 +++----
.../internals/KStreamKStreamLeftJoinTest.java | 17 +--
.../kstream/internals/KStreamKTableJoinTest.java | 13 ++-
.../internals/KStreamKTableLeftJoinTest.java | 10 +-
.../streams/kstream/internals/KStreamMapTest.java | 10 +-
.../kstream/internals/KStreamMapValuesTest.java | 13 +--
.../kstream/internals/KStreamSelectKeyTest.java | 8 +-
.../kstream/internals/KStreamTransformTest.java | 120 ++++++++++-----------
.../internals/KStreamTransformValuesTest.java | 19 ++--
.../internals/KStreamWindowAggregateTest.java | 44 ++++----
.../kstream/internals/KTableAggregateTest.java | 46 ++++----
.../kstream/internals/KTableFilterTest.java | 81 +++++++-------
.../streams/kstream/internals/KTableImplTest.java | 24 ++---
.../internals/KTableKTableInnerJoinTest.java | 41 +++----
.../internals/KTableKTableLeftJoinTest.java | 24 +++--
.../internals/KTableKTableOuterJoinTest.java | 25 +++--
.../kstream/internals/KTableMapKeysTest.java | 8 +-
.../kstream/internals/KTableMapValuesTest.java | 31 +++---
.../kstream/internals/KTableSourceTest.java | 20 ++--
.../streams/processor/TopologyBuilderTest.java | 4 -
.../internals/AbstractProcessorContextTest.java | 3 -
.../processor/internals/GlobalStateTaskTest.java | 4 +-
.../internals/InternalTopologyBuilderTest.java | 3 -
.../processor/internals/ProcessorNodeTest.java | 10 --
.../processor/internals/ProcessorTopologyTest.java | 37 +------
.../processor/internals/PunctuationQueueTest.java | 96 ++++++-----------
.../processor/internals/StreamTaskTest.java | 18 ++--
.../processor/internals/StreamThreadTest.java | 4 -
.../kafka/test/InternalMockProcessorContext.java | 3 -
.../org/apache/kafka/test/KStreamTestDriver.java | 15 ---
...ckProcessorSupplier.java => MockProcessor.java} | 87 ++++++---------
.../org/apache/kafka/test/MockProcessorNode.java | 23 ++--
.../apache/kafka/test/MockProcessorSupplier.java | 85 +++------------
.../apache/kafka/test/NoOpProcessorContext.java | 4 -
.../streams/scala/kstream/KGroupedStream.scala | 1 -
.../kafka/streams/scala/kstream/KStream.scala | 8 --
.../scala/kstream/TimeWindowedKStream.scala | 1 -
.../streams/processor/MockProcessorContext.java | 8 --
.../kafka/streams/MockProcessorContextTest.java | 4 -
.../kafka/streams/TopologyTestDriverTest.java | 14 ---
66 files changed, 490 insertions(+), 815 deletions(-)
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index dbf2b70..523bb46 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -98,10 +98,6 @@ public class WordCountProcessorDemo {
}
@Override
- @Deprecated
- public void punctuate(long timestamp) {}
-
- @Override
public void close() {}
};
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 29de64c..d8589e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -177,7 +176,8 @@ public interface KGroupedStream<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
- * Note: the valueSerde will be automatically set to {@link Serdes#Long()} if there is no valueSerde provided
+ * Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()}
+ * if there is no valueSerde provided
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 8ef0bd7..7f9752b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -101,7 +100,8 @@ public interface TimeWindowedKStream<K, V> {
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
** @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
- * Note: the valueSerde will be automatically set to {@link Serdes#Long()} if there is no valueSerde provided
+ * Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()}
+ * if there is no valueSerde provided
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index a83b4a3..bbf8c25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.To;
/**
@@ -82,26 +81,6 @@ public interface Transformer<K, V, R> {
R transform(final K key, final V value);
/**
- * Perform any periodic operations and possibly generate new {@link KeyValue} pairs if this processor
- * {@link ProcessorContext#schedule(long) schedules itself} with the context during
- * {@link #init(ProcessorContext) initialization}.
- * <p>
- * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
- * {@link ProcessorContext#forward(Object, Object, To)} can be used.
- * <p>
- * Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to
- * timestamps return by the used {@link TimestampExtractor})
- * and not based on wall-clock time.
- *
- * @deprecated Please use {@link Punctuator} functional interface instead.
- *
- * @param timestamp the stream time when {@code punctuate} is being called
- * @return new {@link KeyValue} pair to be forwarded to down stream—if {@code null} will not be forwarded
- */
- @Deprecated
- R punctuate(final long timestamp);
-
- /**
* Close this processor and clean up any resources.
* <p>
* To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 0afadbb..1ae8ede 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -59,15 +59,6 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
context().forward(pair.key, pair.value);
}
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(long timestamp) {
- KeyValue<? extends K2, ? extends V2> pair = transformer.punctuate(timestamp);
-
- if (pair != null)
- context().forward(pair.key, pair.value);
- }
-
@Override
public void close() {
transformer.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index e644597..d09fae2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -106,12 +106,6 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
return context.schedule(interval, type, callback);
}
- @SuppressWarnings("deprecation")
- @Override
- public void schedule(final long interval) {
- context.schedule(interval);
- }
-
@Override
public <K, V> void forward(final K key, final V value) {
throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
@@ -177,14 +171,6 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
context.forward(key, valueTransformer.transform(key, value));
}
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(long timestamp) {
- if (valueTransformer.punctuate(timestamp) != null) {
- throw new StreamsException("ValueTransformer#punctuate must return null.");
- }
- }
-
@Override
public void close() {
valueTransformer.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 14e6c2a..83abfca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor;
/**
* An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
- * implementations of {@link #punctuate(long)} and {@link #close()}.
+ * implementation of {@link #close()}.
*
* @param <K> the type of keys
* @param <V> the type of values
@@ -36,21 +36,6 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
}
/**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
- * during {@link #init(ProcessorContext) initialization}.
- * <p>
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- * </p>
- *
- * @param timestamp the wallclock time when this method is being called
- */
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(final long timestamp) {
- // do nothing
- }
-
- /**
* Close this processor and clean up any resources.
* <p>
* This method does nothing by default; if desired, subclasses should override it with custom functionality.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index 2ed17df..bcdb2f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -48,17 +48,6 @@ public interface Processor<K, V> {
void process(K key, V value);
/**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
- * during {@link #init(ProcessorContext) initialization}.
- *
- * @deprecated Please use {@link Punctuator} functional interface instead.
- *
- * @param timestamp the stream time when this method is being called
- */
- @Deprecated
- void punctuate(long timestamp);
-
- /**
* Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
* Thus, it is not possible to write anything to Kafka as underlying clients are already closed.
* <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 404b225..93a1455 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -132,18 +132,6 @@ public interface ProcessorContext {
final Punctuator callback);
/**
- * Schedules a periodic operation for processors. A processor may call this method during
- * {@link Processor#init(ProcessorContext) initialization} to
- * schedule a periodic call - called a punctuation - to {@link Processor#punctuate(long)}.
- *
- * @deprecated Please use {@link #schedule(long, PunctuationType, Punctuator)} instead.
- *
- * @param interval the time interval between punctuations
- */
- @Deprecated
- void schedule(final long interval);
-
- /**
* Forwards a key/value pair to all downstream processors.
* Used the input record's timestamp as timestamp for the output record.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 6bc4121..717e6a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -96,14 +96,4 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
}
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @SuppressWarnings("deprecation")
- @Override
- public void schedule(long interval) {
- throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
- }
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 178937f..a539a1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -152,15 +152,4 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return task.schedule(interval, type, callback);
}
- @Override
- @Deprecated
- public void schedule(final long interval) {
- schedule(interval, PunctuationType.STREAM_TIME, new Punctuator() {
- @Override
- public void punctuate(final long timestamp) {
- currentNode().processor().punctuate(timestamp);
- }
- });
- }
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 0854b67..a0a7041 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -37,9 +37,9 @@ public class ProcessorNode<K, V> {
private final List<ProcessorNode<?, ?>> children;
private final Map<String, ProcessorNode<?, ?>> childByName;
- private final String name;
- private final Processor<K, V> processor;
private NodeMetrics nodeMetrics;
+ private final Processor<K, V> processor;
+ private final String name;
private final Time time;
private K key;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ef4585a..6aeca44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -188,15 +188,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
* @throws UnsupportedOperationException on every invocation
*/
@Override
- @Deprecated
- public void schedule(final long interval) {
- throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
public RecordContext recordContext() {
throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index d3e01fa..15e55d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -189,7 +189,7 @@ public class StreamsBuilderTest {
driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
// no exception was thrown
- assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -208,8 +208,8 @@ public class StreamsBuilderTest {
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
- assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
- assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
+ assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -232,7 +232,7 @@ public class StreamsBuilderTest {
driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
- assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index eee3386..0c34723 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -296,10 +296,6 @@ public class TopologyTest {
@Override
public void process(Object key, Object value) { }
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(long timestamp) { }
-
@Override
public void close() { }
};
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index c4ea964..30c90c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -644,11 +644,6 @@ public class EosIntegrationTest {
}
@Override
- public KeyValue<Long, Long> punctuate(final long timestamp) {
- return null;
- }
-
- @Override
public void close() { }
};
} }, storeNames)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 19ddedf..12b0d97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -57,7 +57,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -106,13 +105,12 @@ public class RestoreIntegrationTest {
}
@After
- public void shutdown() throws IOException {
+ public void shutdown() {
if (kafkaStreams != null) {
kafkaStreams.close(30, TimeUnit.SECONDS);
}
}
-
@Test
public void shouldRestoreState() throws ExecutionException, InterruptedException {
final AtomicInteger numReceived = new AtomicInteger(0);
@@ -276,11 +274,6 @@ public class RestoreIntegrationTest {
}
@Override
- public void punctuate(final long timestamp) {
-
- }
-
- @Override
public void close() {
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 81bdb31..b63f2de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -117,7 +117,7 @@ public class KStreamBuilderTest {
driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
// no exception was thrown
- assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -134,8 +134,8 @@ public class KStreamBuilderTest {
driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
- assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
- assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
+ assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -170,7 +170,7 @@ public class KStreamBuilderTest {
driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
- assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 2aa07f3..1f9bcba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -76,12 +76,12 @@ public class AbstractStreamTest {
public void testShouldBeExtensible() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
- final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
final String topicName = "topic";
ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())));
- stream.randomFilter().process(processor);
+ stream.randomFilter().process(supplier);
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test");
@@ -94,7 +94,7 @@ public class AbstractStreamTest {
driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
- assertTrue(processor.processed.size() <= expectedKeys.length);
+ assertTrue(supplier.theCapturedProcessor().processed.size() <= expectedKeys.length);
}
private class ExtendedKStream<K, V> extends AbstractStream<K> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index a70bc37..bd3d60b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -26,13 +26,14 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Array;
+import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -90,17 +91,15 @@ public class KStreamBranchTest {
KStream<Integer, String> stream;
KStream<Integer, String>[] branches;
- MockProcessorSupplier<Integer, String>[] processors;
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
branches = stream.branch(isEven, isMultipleOfThree, isOdd);
assertEquals(3, branches.length);
- processors = (MockProcessorSupplier<Integer, String>[]) Array.newInstance(MockProcessorSupplier.class, branches.length);
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
for (int i = 0; i < branches.length; i++) {
- processors[i] = new MockProcessorSupplier<>();
- branches[i].process(processors[i]);
+ branches[i].process(supplier);
}
driver = new TopologyTestDriver(builder.build(), props);
@@ -108,9 +107,10 @@ public class KStreamBranchTest {
driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
- assertEquals(3, processors[0].processed.size());
- assertEquals(1, processors[1].processed.size());
- assertEquals(2, processors[2].processed.size());
+ final List<MockProcessor<Integer, String>> processors = supplier.capturedProcessors(3);
+ assertEquals(3, processors.get(0).processed.size());
+ assertEquals(1, processors.get(1).processed.size());
+ assertEquals(2, processors.get(2).processed.size());
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index a67d688..d338fe3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -74,18 +74,18 @@ public class KStreamFilterTest {
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
KStream<Integer, String> stream;
- MockProcessorSupplier<Integer, String> processor;
+ MockProcessorSupplier<Integer, String> supplier;
- processor = new MockProcessorSupplier<>();
+ supplier = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
- stream.filter(isMultipleOfThree).process(processor);
+ stream.filter(isMultipleOfThree).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
- assertEquals(2, processor.processed.size());
+ assertEquals(2, supplier.theCapturedProcessor().processed.size());
}
@Test
@@ -94,18 +94,18 @@ public class KStreamFilterTest {
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
KStream<Integer, String> stream;
- MockProcessorSupplier<Integer, String> processor;
+ MockProcessorSupplier<Integer, String> supplier;
- processor = new MockProcessorSupplier<>();
+ supplier = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
- stream.filterNot(isMultipleOfThree).process(processor);
+ stream.filterNot(isMultipleOfThree).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
- assertEquals(5, processor.processed.size());
+ assertEquals(5, supplier.theCapturedProcessor().processed.size());
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index e414218..9ce24b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -82,23 +82,23 @@ public class KStreamFlatMapTest {
final int[] expectedKeys = {0, 1, 2, 3};
KStream<Integer, String> stream;
- MockProcessorSupplier<String, String> processor;
+ MockProcessorSupplier<String, String> supplier;
- processor = new MockProcessorSupplier<>();
+ supplier = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
- stream.flatMap(mapper).process(processor);
+ stream.flatMap(mapper).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
- assertEquals(6, processor.processed.size());
+ assertEquals(6, supplier.theCapturedProcessor().processed.size());
String[] expected = {"10:V1", "20:V2", "21:V2", "30:V3", "31:V3", "32:V3"};
for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 14213c9..221b02b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -80,8 +80,8 @@ public class KStreamFlatMapValuesTest {
final int[] expectedKeys = {0, 1, 2, 3};
final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
- final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
- stream.flatMapValues(mapper).process(processor);
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ stream.flatMapValues(mapper).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (final int expectedKey : expectedKeys) {
@@ -91,7 +91,7 @@ public class KStreamFlatMapValuesTest {
String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
- assertArrayEquals(expected, processor.processed.toArray());
+ assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
@@ -113,9 +113,9 @@ public class KStreamFlatMapValuesTest {
final int[] expectedKeys = {0, 1, 2, 3};
final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
- final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- stream.flatMapValues(mapper).process(processor);
+ stream.flatMapValues(mapper).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (final int expectedKey : expectedKeys) {
@@ -125,6 +125,6 @@ public class KStreamFlatMapValuesTest {
String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
- assertArrayEquals(expected, processor.processed.toArray());
+ assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 2936f5f..6e5b816 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -36,7 +37,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;
@@ -50,19 +50,19 @@ public class KStreamGlobalKTableJoinTest {
private final Serde<Integer> intSerde = Serdes.Integer();
private final Serde<String> stringSerde = Serdes.String();
private TopologyTestDriver driver;
- private MockProcessorSupplier<Integer, String> processor;
+ private MockProcessor<Integer, String> processor;
private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@Before
- public void setUp() throws IOException {
+ public void setUp() {
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
final GlobalKTable<String, String> table; // value of stream optionally contains key of table
final KeyValueMapper<Integer, String, String> keyMapper;
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
stream = builder.stream(streamTopic, streamConsumed);
@@ -76,7 +76,7 @@ public class KStreamGlobalKTableJoinTest {
return tokens.length > 1 ? tokens[1] : null;
}
};
- stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+ stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test");
@@ -86,6 +86,8 @@ public class KStreamGlobalKTableJoinTest {
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
driver = new TopologyTestDriver(builder.build(), props);
+
+ processor = supplier.theCapturedProcessor();
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 8882113..b3551ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -29,13 +29,13 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;
@@ -48,20 +48,22 @@ public class KStreamGlobalKTableLeftJoinTest {
final private String globalTableTopic = "globalTableTopic";
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+
+ private MockProcessor<Integer, String> processor;
private TopologyTestDriver driver;
- private MockProcessorSupplier<Integer, String> processor;
- private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
+ private final int[] expectedKeys = {0, 1, 2, 3};
+
@Before
- public void setUp() throws IOException {
+ public void setUp() {
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
final GlobalKTable<String, String> table; // value of stream optionally contains key of table
final KeyValueMapper<Integer, String, String> keyMapper;
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
stream = builder.stream(streamTopic, streamConsumed);
@@ -75,7 +77,7 @@ public class KStreamGlobalKTableLeftJoinTest {
return tokens.length > 1 ? tokens[1] : null;
}
};
- stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+ stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test");
@@ -85,6 +87,8 @@ public class KStreamGlobalKTableLeftJoinTest {
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
driver = new TopologyTestDriver(builder.build(), props);
+
+ processor = supplier.theCapturedProcessor();
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index f397246..797575d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -69,12 +69,15 @@ import static org.junit.Assert.fail;
public class KStreamImplTest {
- final private Serde<String> stringSerde = Serdes.String();
- final private Serde<Integer> intSerde = Serdes.Integer();
+ private final Serde<String> stringSerde = Serdes.String();
+ private final Serde<Integer> intSerde = Serdes.Integer();
+ private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
+
+ private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+
private KStream<String, String> testStream;
private StreamsBuilder builder;
- private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
private TopologyTestDriver driver;
@@ -222,12 +225,11 @@ public class KStreamImplTest {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(input, consumed);
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
driver = new TopologyTestDriver(builder.build(), props);
driver.pipeInput(recordFactory.create(input, "a", "b"));
- assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b")));
+ assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("a:b")));
}
@Test
@@ -235,13 +237,12 @@ public class KStreamImplTest {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(input, consumed);
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
stream.to("to-topic", Produced.with(stringSerde, stringSerde));
builder.stream("to-topic", consumed).process(processorSupplier);
driver = new TopologyTestDriver(builder.build(), props);
driver.pipeInput(recordFactory.create(input, "e", "f"));
- assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f")));
+ assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f")));
}
@Test
@@ -519,7 +520,6 @@ public class KStreamImplTest {
final KStream<String, String> source2 = builder.stream(topic2);
final KStream<String, String> merged = source1.merge(source2);
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
driver = new TopologyTestDriver(builder.build(), props);
@@ -529,7 +529,7 @@ public class KStreamImplTest {
driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
- assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -545,7 +545,6 @@ public class KStreamImplTest {
final KStream<String, String> source4 = builder.stream(topic4);
final KStream<String, String> merged = source1.merge(source2).merge(source3).merge(source4);
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
driver = new TopologyTestDriver(builder.build(), props);
@@ -560,14 +559,13 @@ public class KStreamImplTest {
driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
- processorSupplier.processed);
+ processorSupplier.theCapturedProcessor().processed);
}
@Test
public void shouldProcessFromSourceThatMatchPattern() {
final KStream<String, String> pattern2Source = builder.stream(Pattern.compile("topic-\\d"));
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
pattern2Source.process(processorSupplier);
driver = new TopologyTestDriver(builder.build(), props);
@@ -579,7 +577,7 @@ public class KStreamImplTest {
driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
- processorSupplier.processed);
+ processorSupplier.theCapturedProcessor().processed);
}
@Test
@@ -591,7 +589,6 @@ public class KStreamImplTest {
final KStream<String, String> source3 = builder.stream(topic3);
final KStream<String, String> merged = pattern2Source1.merge(pattern2Source2).merge(source3);
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
driver = new TopologyTestDriver(builder.build(), props);
@@ -603,6 +600,6 @@ public class KStreamImplTest {
driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
- processorSupplier.processed);
+ processorSupplier.theCapturedProcessor().processed);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 63a040a..5d849ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -119,9 +120,7 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.join(
@@ -129,7 +128,7 @@ public class KStreamKStreamJoinTest {
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
Joined.with(intSerde, stringSerde, stringSerde));
- joined.process(processor);
+ joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -138,6 +137,8 @@ public class KStreamKStreamJoinTest {
driver = new TopologyTestDriver(builder.build(), props);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
// push two items to the primary stream. the other window is empty
// w1 = {}
// w2 = {}
@@ -220,9 +221,7 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -231,7 +230,7 @@ public class KStreamKStreamJoinTest {
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
Joined.with(intSerde, stringSerde, stringSerde));
- joined.process(processor);
+ joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
assertEquals(1, copartitionGroups.size());
@@ -239,6 +238,8 @@ public class KStreamKStreamJoinTest {
driver = new TopologyTestDriver(builder.build(), props, 0L);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
// push two items to the primary stream. the other window is empty.this should produce two items
// w1 = {}
// w2 = {}
@@ -323,9 +324,7 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -334,7 +333,7 @@ public class KStreamKStreamJoinTest {
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
Joined.with(intSerde, stringSerde, stringSerde));
- joined.process(processor);
+ joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -352,6 +351,8 @@ public class KStreamKStreamJoinTest {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
}
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
processor.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items.
@@ -543,9 +544,7 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -556,7 +555,7 @@ public class KStreamKStreamJoinTest {
Joined.with(intSerde,
stringSerde,
stringSerde));
- joined.process(processor);
+ joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -565,6 +564,8 @@ public class KStreamKStreamJoinTest {
driver = new TopologyTestDriver(builder.build(), props, time);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
for (int i = 0; i < expectedKeys.length; i++) {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
}
@@ -653,9 +654,8 @@ public class KStreamKStreamJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- processor = new MockProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -664,7 +664,7 @@ public class KStreamKStreamJoinTest {
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(0).before(100),
Joined.with(intSerde, stringSerde, stringSerde));
- joined.process(processor);
+ joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -673,6 +673,8 @@ public class KStreamKStreamJoinTest {
driver = new TopologyTestDriver(builder.build(), props, time);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
for (int i = 0; i < expectedKeys.length; i++) {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index cb1aaf1..c67e13d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -83,9 +84,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -93,7 +92,7 @@ public class KStreamKStreamLeftJoinTest {
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
Joined.with(intSerde, stringSerde, stringSerde));
- joined.process(processor);
+ joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -102,6 +101,8 @@ public class KStreamKStreamLeftJoinTest {
driver = new TopologyTestDriver(builder.build(), props, 0L);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
// push two items to the primary stream. the other window is empty
// w1 {}
// w2 {}
@@ -168,9 +169,7 @@ public class KStreamKStreamLeftJoinTest {
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
@@ -178,7 +177,7 @@ public class KStreamKStreamLeftJoinTest {
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
Joined.with(intSerde, stringSerde, stringSerde));
- joined.process(processor);
+ joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -187,6 +186,8 @@ public class KStreamKStreamLeftJoinTest {
driver = new TopologyTestDriver(builder.build(), props, time);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
// push two items to the primary stream. the other window is empty. this should produce two items
// w1 = {}
// w2 = {}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 5b2a797..ec31b5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -54,9 +55,11 @@ public class KStreamKTableJoinTest {
private final Serde<Integer> intSerde = Serdes.Integer();
private final Serde<String> stringSerde = Serdes.String();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private MockProcessorSupplier<Integer, String> processor;
+
private final int[] expectedKeys = {0, 1, 2, 3};
+
+ private MockProcessor<Integer, String> processor;
+ private TopologyTestDriver driver;
private StreamsBuilder builder;
@Before
@@ -66,11 +69,11 @@ public class KStreamKTableJoinTest {
final KStream<Integer, String> stream;
final KTable<Integer, String> table;
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
stream = builder.stream(streamTopic, consumed);
table = builder.table(tableTopic, consumed);
- stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
+ stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test");
@@ -80,6 +83,8 @@ public class KStreamKTableJoinTest {
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+ processor = supplier.theCapturedProcessor();
}
private void pushToStream(final int messageCount, final String valuePrefix) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 669f4c7..735f71c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
@@ -51,7 +52,8 @@ public class KStreamKTableLeftJoinTest {
final private Serde<String> stringSerde = Serdes.String();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
private TopologyTestDriver driver;
- private MockProcessorSupplier<Integer, String> processor;
+ private MockProcessor<Integer, String> processor;
+
private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@@ -63,11 +65,11 @@ public class KStreamKTableLeftJoinTest {
final KStream<Integer, String> stream;
final KTable<Integer, String> table;
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
stream = builder.stream(streamTopic, consumed);
table = builder.table(tableTopic, consumed);
- stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
+ stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test");
@@ -77,6 +79,8 @@ public class KStreamKTableLeftJoinTest {
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+ processor = supplier.theCapturedProcessor();
}
private void pushToStream(final int messageCount, final String valuePrefix) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index bb22204..b0a383b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -81,22 +81,22 @@ public class KStreamMapTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
- MockProcessorSupplier<String, Integer> processor;
+ MockProcessorSupplier<String, Integer> supplier;
- processor = new MockProcessorSupplier<>();
- stream.map(mapper).process(processor);
+ supplier = new MockProcessorSupplier<>();
+ stream.map(mapper).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
- assertEquals(4, processor.processed.size());
+ assertEquals(4, supplier.theCapturedProcessor().processed.size());
String[] expected = new String[]{"V0:0", "V1:1", "V2:2", "V3:3"};
for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 17a13e0..ed11038 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -44,6 +44,9 @@ public class KStreamMapValuesTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
+
+
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
private TopologyTestDriver driver;
private final Properties props = new Properties();
@@ -81,9 +84,8 @@ public class KStreamMapValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
KStream<Integer, String> stream;
- MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
- stream.mapValues(mapper).process(processor);
+ stream.mapValues(mapper).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
@@ -91,7 +93,7 @@ public class KStreamMapValuesTest {
}
String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
- assertArrayEquals(expected, processor.processed.toArray());
+ assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
@Test
@@ -109,9 +111,8 @@ public class KStreamMapValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
KStream<Integer, String> stream;
- MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
- stream.mapValues(mapper).process(processor);
+ stream.mapValues(mapper).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
@@ -119,7 +120,7 @@ public class KStreamMapValuesTest {
}
String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
- assertArrayEquals(expected, processor.processed.toArray());
+ assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 0bf6452..1abc0b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -90,9 +90,9 @@ public class KStreamSelectKeyTest {
KStream<String, Integer> stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));
- MockProcessorSupplier<String, Integer> processor = new MockProcessorSupplier<>();
+ MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- stream.selectKey(selector).process(processor);
+ stream.selectKey(selector).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
@@ -100,10 +100,10 @@ public class KStreamSelectKeyTest {
driver.pipeInput(recordFactory.create(expectedValue));
}
- assertEquals(3, processor.processed.size());
+ assertEquals(3, supplier.theCapturedProcessor().processed.size());
for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index aa0cf7e..1567fe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -48,6 +48,7 @@ public class KStreamTransformTest {
private String topicName = "topic";
final private Serde<Integer> intSerde = Serdes.Integer();
+
private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
private TopologyTestDriver driver;
private final Properties props = new Properties();
@@ -77,34 +78,26 @@ public class KStreamTransformTest {
public void testTransform() {
StreamsBuilder builder = new StreamsBuilder();
- TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
- new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
- public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
- return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
-
- private int total = 0;
+ final TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier = new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+ public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
+ return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
- @Override
- public void init(ProcessorContext context) {
- }
+ private int total = 0;
- @Override
- public KeyValue<Integer, Integer> transform(Number key, Number value) {
- total += value.intValue();
- return KeyValue.pair(key.intValue() * 2, total);
- }
+ @Override
+ public void init(final ProcessorContext context) {}
- @Override
- public KeyValue<Integer, Integer> punctuate(long timestamp) {
- return KeyValue.pair(-1, (int) timestamp);
- }
+ @Override
+ public KeyValue<Integer, Integer> transform(final Number key, final Number value) {
+ total += value.intValue();
+ return KeyValue.pair(key.intValue() * 2, total);
+ }
- @Override
- public void close() {
- }
- };
- }
- };
+ @Override
+ public void close() {}
+ };
+ }
+ };
final int[] expectedKeys = {1, 10, 100, 1000};
@@ -117,15 +110,18 @@ public class KStreamTransformTest {
kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
}
- kstreamDriver.punctuate(2);
- kstreamDriver.punctuate(3);
+ // TODO: un-comment after replaced with TopologyTestDriver
+ //kstreamDriver.punctuate(2);
+ //kstreamDriver.punctuate(3);
- assertEquals(6, processor.processed.size());
+ //assertEquals(6, processor.theCapturedProcessor().processed.size());
- String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
+ //String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
+
+ String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
}
}
@@ -133,40 +129,34 @@ public class KStreamTransformTest {
public void testTransformWithNewDriverAndPunctuator() {
StreamsBuilder builder = new StreamsBuilder();
- TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
- new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
- public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
- return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
-
- private int total = 0;
-
- @Override
- public void init(final ProcessorContext context) {
- context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- context.forward(-1, (int) timestamp);
- }
- });
- }
-
- @Override
- public KeyValue<Integer, Integer> transform(Number key, Number value) {
- total += value.intValue();
- return KeyValue.pair(key.intValue() * 2, total);
- }
-
- @Override
- public KeyValue<Integer, Integer> punctuate(long timestamp) {
- return null;
- }
-
- @Override
- public void close() {
- }
- };
- }
- };
+ TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier = new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+ public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
+ return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
+
+ private int total = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ context.forward(-1, (int) timestamp);
+ }
+ });
+ }
+
+ @Override
+ public KeyValue<Integer, Integer> transform(final Number key, final Number value) {
+ total += value.intValue();
+ return KeyValue.pair(key.intValue() * 2, total);
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+
final int[] expectedKeys = {1, 10, 100, 1000};
@@ -184,12 +174,12 @@ public class KStreamTransformTest {
// This tick further advances the clock to 3, which leads to the "-1:3" result
driver.advanceWallClockTime(1);
- assertEquals(6, processor.processed.size());
+ assertEquals(6, processor.theCapturedProcessor().processed.size());
String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 59a6a21..6bfc813 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -49,6 +49,8 @@ public class KStreamTransformValuesTest {
private String topicName = "topic";
final private Serde<Integer> intSerde = Serdes.Integer();
+ final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
+
private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
private TopologyTestDriver driver;
private final Properties props = new Properties();
@@ -107,9 +109,8 @@ public class KStreamTransformValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
KStream<Integer, Integer> stream;
- MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
- stream.transformValues(valueTransformerSupplier).process(processor);
+ stream.transformValues(valueTransformerSupplier).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
@@ -118,7 +119,7 @@ public class KStreamTransformValuesTest {
}
String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
- assertArrayEquals(expected, processor.processed.toArray());
+ assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
@Test
@@ -151,9 +152,8 @@ public class KStreamTransformValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
KStream<Integer, Integer> stream;
- MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
- stream.transformValues(valueTransformerSupplier).process(processor);
+ stream.transformValues(valueTransformerSupplier).process(supplier);
driver = new TopologyTestDriver(builder.build(), props);
@@ -162,7 +162,7 @@ public class KStreamTransformValuesTest {
}
String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
- assertArrayEquals(expected, processor.processed.toArray());
+ assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
@@ -226,13 +226,6 @@ public class KStreamTransformValuesTest {
} catch (final StreamsException e) {
// expected
}
-
- try {
- transformValueProcessor.punctuate(0);
- fail("should not allow ValueTransformer#puntuate() to return not-null value");
- } catch (final StreamsException e) {
- // expected
- }
}
private static final class BadValueTransformer implements ValueTransformerWithKey<Integer, Integer, Integer> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index fc31db9..9050edb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -37,12 +37,14 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.List;
import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -85,8 +87,8 @@ public class KStreamWindowAggregateTest {
.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
- final MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
driver = new TopologyTestDriver(builder.build(), props, 0L);
@@ -128,7 +130,7 @@ public class KStreamWindowAggregateTest {
"[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2",
"[C@5/15]:0+3+3", "[C@10/20]:0+3"
),
- proc2.processed
+ supplier.theCapturedProcessor().processed
);
}
@@ -143,24 +145,22 @@ public class KStreamWindowAggregateTest {
.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
- final MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
- table1.toStream().process(proc1);
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table1.toStream().process(supplier);
final KTable<Windowed<String>, String> table2 = builder
.stream(topic2, Consumed.with(strSerde, strSerde)).groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic2-Canonized");
- final MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
+ table2.toStream().process(supplier);
- final MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
table1.join(table2, new ValueJoiner<String, String, String>() {
@Override
public String apply(final String p1, final String p2) {
return p1 + "%" + p2;
}
- }).toStream().process(proc3);
+ }).toStream().process(supplier);
driver = new TopologyTestDriver(builder.build(), props, 0L);
@@ -170,15 +170,17 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
- proc1.checkAndClearProcessResult(
+ final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+
+ processors.get(0).checkAndClearProcessResult(
"[A@0/10]:0+1",
"[B@0/10]:0+2",
"[C@0/10]:0+3",
"[D@0/10]:0+4",
"[A@0/10]:0+1+1"
);
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
@@ -186,15 +188,15 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
- proc1.checkAndClearProcessResult(
+ processors.get(0).checkAndClearProcessResult(
"[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
"[B@0/10]:0+2+2", "[B@5/15]:0+2",
"[D@0/10]:0+4+4", "[D@5/15]:0+4",
"[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
"[C@0/10]:0+3+3", "[C@5/15]:0+3"
);
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
@@ -202,15 +204,15 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(
"[A@0/10]:0+a",
"[B@0/10]:0+b",
"[C@0/10]:0+c",
"[D@0/10]:0+d",
"[A@0/10]:0+a+a"
);
- proc3.checkAndClearProcessResult(
+ processors.get(2).checkAndClearProcessResult(
"[A@0/10]:0+1+1+1%0+a",
"[B@0/10]:0+2+2+2%0+b",
"[C@0/10]:0+3+3%0+c",
@@ -223,15 +225,15 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(
"[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
"[B@0/10]:0+b+b", "[B@5/15]:0+b",
"[D@0/10]:0+d+d", "[D@5/15]:0+d",
"[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
"[C@0/10]:0+c+c", "[C@5/15]:0+c"
);
- proc3.checkAndClearProcessResult(
+ processors.get(2).checkAndClearProcessResult(
"[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
"[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
"[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index df8d292..a769b49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@@ -50,9 +51,10 @@ import static org.junit.Assert.assertEquals;
public class KTableAggregateTest {
- final private Serde<String> stringSerde = Serdes.String();
+ private final Serde<String> stringSerde = Serdes.String();
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde);
+ private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
private File stateDir = null;
@@ -70,7 +72,7 @@ public class KTableAggregateTest {
public void testAggBasic() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
- final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+
KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
@@ -81,7 +83,7 @@ public class KTableAggregateTest {
stringSerde,
"topic1-Canonized");
- table2.toStream().process(proc);
+ table2.toStream().process(supplier);
driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
@@ -110,7 +112,7 @@ public class KTableAggregateTest {
"C:0+5",
"D:0+6",
"B:0+2-2+4-4+7",
- "C:0+5-5+8"), proc.processed);
+ "C:0+5-5+8"), supplier.theCapturedProcessor().processed);
}
@@ -118,7 +120,6 @@ public class KTableAggregateTest {
public void testAggCoalesced() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
- final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
@@ -129,7 +130,7 @@ public class KTableAggregateTest {
stringSerde,
"topic1-Canonized");
- table2.toStream().process(proc);
+ table2.toStream().process(supplier);
driver.setUp(builder, stateDir);
@@ -138,7 +139,7 @@ public class KTableAggregateTest {
driver.process(topic1, "A", "4");
driver.flushState();
assertEquals(Utils.mkList(
- "A:0+4"), proc.processed);
+ "A:0+4"), supplier.theCapturedProcessor().processed);
}
@@ -146,7 +147,6 @@ public class KTableAggregateTest {
public void testAggRepartition() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
- final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@@ -170,7 +170,7 @@ public class KTableAggregateTest {
stringSerde,
"topic1-Canonized");
- table2.toStream().process(proc);
+ table2.toStream().process(supplier);
driver.setUp(builder, stateDir);
@@ -200,10 +200,10 @@ public class KTableAggregateTest {
"2:0+2-2", "4:0+4",
//noop
"4:0+4-4", "7:0+7"
- ), proc.processed);
+ ), supplier.theCapturedProcessor().processed);
}
- private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) {
+ private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Object> supplier) {
driver.setUp(builder, stateDir);
driver.process(input, "A", "green");
@@ -225,53 +225,53 @@ public class KTableAggregateTest {
"green:1", "blue:1",
"yellow:1",
"green:2"
- ), proc.processed);
+ ), supplier.theCapturedProcessor().processed);
}
@Test
public void testCount() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "count-test-input";
- final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
builder.table(input, consumed)
.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
.count("count")
.toStream()
- .process(proc);
+ .process(supplier);
- testCountHelper(builder, input, proc);
+ testCountHelper(builder, input, supplier);
}
@Test
public void testCountWithInternalStore() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "count-test-input";
- final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
builder.table(input, consumed)
.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
.count()
.toStream()
- .process(proc);
+ .process(supplier);
- testCountHelper(builder, input, proc);
+ testCountHelper(builder, input, supplier);
}
@Test
public void testCountCoalesced() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "count-test-input";
- final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Long> supplier = new MockProcessorSupplier<>();
builder.table(input, consumed)
.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
.count("count")
.toStream()
- .process(proc);
+ .process(supplier);
driver.setUp(builder, stateDir);
+ final MockProcessor<String, Long> proc = supplier.theCapturedProcessor();
+
driver.process(input, "A", "green");
driver.process(input, "B", "green");
driver.process(input, "A", "blue");
@@ -291,7 +291,7 @@ public class KTableAggregateTest {
public void testRemoveOldBeforeAddNew() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "count-test-input";
- final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
builder.table(input, consumed)
.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@@ -321,10 +321,12 @@ public class KTableAggregateTest {
}
}, Serdes.String(), "someStore")
.toStream()
- .process(proc);
+ .process(supplier);
driver.setUp(builder, stateDir);
+ final MockProcessor<String, String> proc = supplier.theCapturedProcessor();
+
driver.process(input, "11", "A");
driver.flushState();
driver.process(input, "12", "B");
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 657e05d..bde771b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -21,11 +21,13 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockMapper;
@@ -35,6 +37,7 @@ import org.junit.Rule;
import org.junit.Test;
import java.io.File;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -57,10 +60,9 @@ public class KTableFilterTest {
final KTable<String, Integer> table2,
final KTable<String, Integer> table3,
final String topic) {
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
- MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
- table3.toStream().process(proc3);
+ MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
+ table3.toStream().process(supplier);
driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
@@ -73,8 +75,10 @@ public class KTableFilterTest {
driver.process(topic, "B", null);
driver.flushState();
- proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
- proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
+ final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+ processors.get(0).checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
+ processors.get(1).checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
}
@Test
@@ -269,11 +273,10 @@ public class KTableFilterTest {
final KTableImpl<String, Integer, Integer> table1,
final KTableImpl<String, Integer, Integer> table2,
final String topic1) {
- MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc1", proc1, table1.name);
- builder.build().addProcessor("proc2", proc2, table2.name);
+ builder.build().addProcessor("proc1", supplier, table1.name);
+ builder.build().addProcessor("proc2", supplier, table2.name);
driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
@@ -282,25 +285,27 @@ public class KTableFilterTest {
driver.process(topic1, "C", 1);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+ final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+ processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
- proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", 3);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(3<-null)");
- proc2.checkAndClearProcessResult("A:(null<-null)");
+ processors.get(0).checkAndClearProcessResult("A:(3<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null)");
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
- proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
}
@@ -348,11 +353,11 @@ public class KTableFilterTest {
final String topic1) {
table2.enableSendingOldValues();
- MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ final Topology topology = builder.build();
- builder.build().addProcessor("proc1", proc1, table1.name);
- builder.build().addProcessor("proc2", proc2, table2.name);
+ topology.addProcessor("proc1", supplier, table1.name);
+ topology.addProcessor("proc2", supplier, table2.name);
driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
@@ -361,25 +366,27 @@ public class KTableFilterTest {
driver.process(topic1, "C", 1);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc2.checkEmptyAndClearProcessResult();
+ final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+ processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ processors.get(1).checkEmptyAndClearProcessResult();
driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
- proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+ processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", 3);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(3<-2)");
- proc2.checkAndClearProcessResult("A:(null<-2)");
+ processors.get(0).checkAndClearProcessResult("A:(3<-2)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-2)");
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
driver.flushState();
- proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
- proc2.checkAndClearProcessResult("B:(null<-2)");
+ processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+ processors.get(1).checkAndClearProcessResult("B:(null<-2)");
}
@Test
@@ -424,11 +431,11 @@ public class KTableFilterTest {
final KTableImpl<String, String, String> table1,
final KTableImpl<String, String, String> table2,
final String topic1) {
- MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
- MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+ final Topology topology = builder.build();
- builder.build().addProcessor("proc1", proc1, table1.name);
- builder.build().addProcessor("proc2", proc2, table2.name);
+ topology.addProcessor("proc1", supplier, table1.name);
+ topology.addProcessor("proc2", supplier, table2.name);
driver.setUp(builder, stateDir, stringSerde, stringSerde);
@@ -436,8 +443,10 @@ public class KTableFilterTest {
driver.process(topic1, "B", "reject");
driver.process(topic1, "C", "reject");
driver.flushState();
- proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
- proc2.checkEmptyAndClearProcessResult();
+
+ final List<MockProcessor<String, String>> processors = supplier.capturedProcessors(2);
+ processors.get(0).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
+ processors.get(1).checkEmptyAndClearProcessResult();
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index a7aed2e..ae1e285 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
@@ -47,6 +48,7 @@ import org.junit.Test;
import java.io.File;
import java.lang.reflect.Field;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -80,8 +82,8 @@ public class KTableImplTest {
KTable<String, String> table1 = builder.table(topic1, consumed);
- MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
- table1.toStream().process(proc1);
+ MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+ table1.toStream().process(supplier);
KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
@Override
@@ -90,8 +92,7 @@ public class KTableImplTest {
}
});
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
+ table2.toStream().process(supplier);
KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
@Override
@@ -100,13 +101,11 @@ public class KTableImplTest {
}
});
- MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
- table3.toStream().process(proc3);
+ table3.toStream().process(supplier);
KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2, storeName2);
- MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
- table4.toStream().process(proc4);
+ table4.toStream().process(supplier);
driver.setUp(builder, stateDir);
@@ -120,10 +119,11 @@ public class KTableImplTest {
driver.flushState();
driver.flushState();
- assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed);
- assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
- assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), proc3.processed);
- assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc4.processed);
+ final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+ assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(0).processed);
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), processors.get(1).processed);
+ assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), processors.get(2).processed);
+ assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(3).processed);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 9f5603b..0ca388f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
@@ -67,7 +68,7 @@ public class KTableKTableInnerJoinTest {
private void doTestJoin(final StreamsBuilder builder,
final int[] expectedKeys,
- final MockProcessorSupplier<Integer, String> processor,
+ final MockProcessorSupplier<Integer, String> supplier,
final KTable<Integer, String> joined) {
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -79,6 +80,8 @@ public class KTableKTableInnerJoinTest {
driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
driver.setTime(0L);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
final KTableValueGetter<Integer, String> getter = getterSupplier.get();
getter.init(driver.context());
@@ -168,15 +171,13 @@ public class KTableKTableInnerJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
- joined.toStream().process(processor);
+ joined.toStream().process(supplier);
- doTestJoin(builder, expectedKeys, processor, joined);
+ doTestJoin(builder, expectedKeys, supplier, joined);
}
@Test
@@ -203,13 +204,15 @@ public class KTableKTableInnerJoinTest {
final int[] expectedKeys,
final KTable<Integer, String> table1,
final KTable<Integer, String> table2,
- final MockProcessorSupplier<Integer, String> proc,
+ final MockProcessorSupplier<Integer, String> supplier,
final KTable<Integer, String> joined,
final boolean sendOldValues) {
driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
driver.setTime(0L);
+ final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
if (!sendOldValues) {
assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
@@ -288,15 +291,15 @@ public class KTableKTableInnerJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> proc;
+ final MockProcessorSupplier<Integer, String> supplier;
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
- proc = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ supplier = new MockProcessorSupplier<>();
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
- doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
+ doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false);
}
@@ -309,15 +312,15 @@ public class KTableKTableInnerJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> proc;
+ final MockProcessorSupplier<Integer, String> supplier;
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
- proc = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ supplier = new MockProcessorSupplier<>();
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
- doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
+ doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false);
}
@@ -330,16 +333,16 @@ public class KTableKTableInnerJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> proc;
+ final MockProcessorSupplier<Integer, String> supplier;
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
- proc = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ supplier = new MockProcessorSupplier<>();
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
- doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true);
+ doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, true);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 6331b57..2eef302 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
@@ -80,9 +81,8 @@ public class KTableKTableLeftJoinTest {
final KTable<Integer, String> table1 = builder.table(topic1, consumed);
final KTable<Integer, String> table2 = builder.table(topic2, consumed);
final KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
- final MockProcessorSupplier<Integer, String> processor;
- processor = new MockProcessorSupplier<>();
- joined.toStream().process(processor);
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+ joined.toStream().process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -94,6 +94,8 @@ public class KTableKTableLeftJoinTest {
driver.setUp(builder, stateDir);
driver.setTime(0L);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
final KTableValueGetter<Integer, String> getter = getterSupplier.get();
getter.init(driver.context());
@@ -174,18 +176,20 @@ public class KTableKTableLeftJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> proc;
+ final MockProcessorSupplier<Integer, String> supplier;
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
- proc = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ supplier = new MockProcessorSupplier<>();
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
driver.setUp(builder, stateDir);
driver.setTime(0L);
+ final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
@@ -255,7 +259,7 @@ public class KTableKTableLeftJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> proc;
+ final MockProcessorSupplier<Integer, String> supplier;
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
@@ -263,12 +267,14 @@ public class KTableKTableLeftJoinTest {
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
- proc = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ supplier = new MockProcessorSupplier<>();
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
driver.setUp(builder, stateDir);
driver.setTime(0L);
+ final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 16694d8..cf3321f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
@@ -74,13 +75,13 @@ public class KTableKTableOuterJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> processor;
+ final MockProcessorSupplier<Integer, String> supplier;
- processor = new MockProcessorSupplier<>();
+ supplier = new MockProcessorSupplier<>();
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
- joined.toStream().process(processor);
+ joined.toStream().process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -91,6 +92,8 @@ public class KTableKTableOuterJoinTest {
driver.setUp(builder, stateDir);
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
final KTableValueGetter<Integer, String> getter = getterSupplier.get();
getter.init(driver.context());
@@ -179,17 +182,19 @@ public class KTableKTableOuterJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> proc;
+ final MockProcessorSupplier<Integer, String> supplier;
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
- proc = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ supplier = new MockProcessorSupplier<>();
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
driver.setUp(builder, stateDir);
+ final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
@@ -267,7 +272,7 @@ public class KTableKTableOuterJoinTest {
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
- final MockProcessorSupplier<Integer, String> proc;
+ final MockProcessorSupplier<Integer, String> supplier;
table1 = builder.table(topic1, consumed);
table2 = builder.table(topic2, consumed);
@@ -275,11 +280,13 @@ public class KTableKTableOuterJoinTest {
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
- proc = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ supplier = new MockProcessorSupplier<>();
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
driver.setUp(builder, stateDir);
+ final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 81797cb..78c7902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -78,9 +78,9 @@ public class KTableMapKeysTest {
final int[] originalKeys = new int[]{1, 2, 3};
final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"};
- MockProcessorSupplier<String, String> processor = new MockProcessorSupplier<>();
+ MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
- convertedStream.process(processor);
+ convertedStream.process(supplier);
driver.setUp(builder, stateDir);
for (int i = 0; i < originalKeys.length; i++) {
@@ -88,10 +88,10 @@ public class KTableMapKeysTest {
}
driver.flushState();
- assertEquals(3, processor.processed.size());
+ assertEquals(3, supplier.theCapturedProcessor().processed.size());
for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
}
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 5d92846..3cd7701 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@@ -54,7 +55,7 @@ public class KTableMapValuesTest {
stateDir = TestUtils.tempDirectory("kafka-test");
}
- private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
+ private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) {
driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
driver.process(topic1, "A", "1");
@@ -62,7 +63,7 @@ public class KTableMapValuesTest {
driver.process(topic1, "C", "3");
driver.process(topic1, "D", "4");
driver.flushState();
- assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed);
}
@Test
@@ -79,10 +80,10 @@ public class KTableMapValuesTest {
}
});
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
+ MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
- doTestKTable(builder, topic1, proc2);
+ doTestKTable(builder, topic1, supplier);
}
@Test
@@ -99,10 +100,10 @@ public class KTableMapValuesTest {
}
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
+ MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
- doTestKTable(builder, topic1, proc2);
+ doTestKTable(builder, topic1, supplier);
}
private void doTestValueGetter(final StreamsBuilder builder,
@@ -282,11 +283,14 @@ public class KTableMapValuesTest {
}
});
- MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, table2.name);
+ builder.build().addProcessor("proc", supplier, table2.name);
driver.setUp(builder, stateDir);
+
+ final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+
assertFalse(table1.sendingOldValueEnabled());
assertFalse(table2.sendingOldValueEnabled());
@@ -332,11 +336,14 @@ public class KTableMapValuesTest {
table2.enableSendingOldValues();
- MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+ MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", proc, table2.name);
+ builder.build().addProcessor("proc", supplier, table2.name);
driver.setUp(builder, stateDir);
+
+ final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+
assertTrue(table1.sendingOldValueEnabled());
assertTrue(table2.sendingOldValueEnabled());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 97c9c7f..70efb41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@@ -61,8 +62,8 @@ public class KTableSourceTest {
final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde));
- final MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- table1.toStream().process(proc1);
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ table1.toStream().process(supplier);
driver.setUp(builder, stateDir);
driver.process(topic1, "A", 1);
@@ -74,7 +75,7 @@ public class KTableSourceTest {
driver.process(topic1, "B", null);
driver.flushState();
- assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed);
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), supplier.theCapturedProcessor().processed);
}
@Test
@@ -145,11 +146,14 @@ public class KTableSourceTest {
final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
- final MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc1", proc1, table1.name);
+ builder.build().addProcessor("proc1", supplier, table1.name);
driver.setUp(builder, stateDir);
+
+ final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+
driver.process(topic1, "A", "01");
driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01");
@@ -187,12 +191,14 @@ public class KTableSourceTest {
assertTrue(table1.sendingOldValueEnabled());
- final MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc1", proc1, table1.name);
+ builder.build().addProcessor("proc1", supplier, table1.name);
driver.setUp(builder, stateDir);
+ final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+
driver.process(topic1, "A", "01");
driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index e3b888d..d1d25e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -638,10 +638,6 @@ public class TopologyBuilderTest {
}
@Override
- public void punctuate(long timestamp) {
- }
-
- @Override
public void close() {
}
};
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index aac275d..43dc38e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -177,9 +177,6 @@ public class AbstractProcessorContextTest {
}
@Override
- public void schedule(final long interval) {}
-
- @Override
public <K, V> void forward(final K key, final V value) {}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 5637dab..f3e369f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -64,8 +64,8 @@ public class GlobalStateTaskTest {
new String[]{topic2},
new IntegerDeserializer(),
new IntegerDeserializer());
- private final MockProcessorNode processorOne = new MockProcessorNode<>(-1);
- private final MockProcessorNode processorTwo = new MockProcessorNode<>(-1);
+ private final MockProcessorNode processorOne = new MockProcessorNode<>();
+ private final MockProcessorNode processorTwo = new MockProcessorNode<>();
private final Map<TopicPartition, Long> offsets = new HashMap<>();
private final NoOpProcessorContext context = new NoOpProcessorContext();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b3663fa..149a158 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -607,9 +607,6 @@ public class InternalTopologyBuilderTest {
public void process(final Object key, final Object value) { }
@Override
- public void punctuate(final long timestamp) { }
-
- @Override
public void close() {
}
};
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index a7a2610..0992063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -62,11 +62,6 @@ public class ProcessorNodeTest {
}
@Override
- public void punctuate(final long timestamp) {
- throw new RuntimeException();
- }
-
- @Override
public void close() {
throw new RuntimeException();
}
@@ -84,11 +79,6 @@ public class ProcessorNodeTest {
}
@Override
- public void punctuate(final long timestamp) {
-
- }
-
- @Override
public void close() {
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a80b25d..51d4e05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -465,11 +464,6 @@ public class ProcessorTopologyTest {
public void process(final String key, final String value) {
context().forward(key, value);
}
-
- @Override
- public void punctuate(final long streamTime) {
- context().forward(Long.toString(streamTime), "punctuate");
- }
}
/**
@@ -510,14 +504,6 @@ public class ProcessorTopologyTest {
context().forward(key, value + "(" + (i + 1) + ")", i);
}
}
-
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(final long streamTime) {
- for (int i = 0; i != numChildren; ++i) {
- context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
- }
- }
}
/**
@@ -538,19 +524,10 @@ public class ProcessorTopologyTest {
context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
}
}
-
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(final long streamTime) {
- for (int i = 0; i != numChildren; ++i) {
- context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
- }
- }
}
/**
- * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
- * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
+ * A processor that stores each key-value pair in an in-memory key-value store registered with the context.
*/
protected static class StatefulProcessor extends AbstractProcessor<String, String> {
private KeyValueStore<String, String> store;
@@ -573,18 +550,6 @@ public class ProcessorTopologyTest {
}
@Override
- public void punctuate(final long streamTime) {
- int count = 0;
- try (KeyValueIterator<String, String> iter = store.all()) {
- while (iter.hasNext()) {
- iter.next();
- ++count;
- }
- }
- context().forward(Long.toString(streamTime), count);
- }
-
- @Override
public void close() {
store.close();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index e799688..ee0d5a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -21,26 +21,24 @@ import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.test.MockProcessorNode;
import org.junit.Test;
-import java.util.ArrayList;
-
import static org.junit.Assert.assertEquals;
public class PunctuationQueueTest {
+ private final MockProcessorNode<String, String> node = new MockProcessorNode<>();
+ private final PunctuationQueue queue = new PunctuationQueue();
+ private final Punctuator punctuator = new Punctuator() {
+ @Override
+ public void punctuate(final long timestamp) {
+ node.mockProcessor.punctuatedStreamTime.add(timestamp);
+ }
+ };
+
@Test
public void testPunctuationInterval() {
- final TestProcessor processor = new TestProcessor();
- final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
- final PunctuationQueue queue = new PunctuationQueue();
- final Punctuator punctuator = new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- node.processor().punctuate(timestamp);
- }
- };
-
final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
final long now = sched.timestamp - 100L;
@@ -54,42 +52,32 @@ public class PunctuationQueueTest {
};
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(0, processor.punctuatedAt.size());
+ assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(0, processor.punctuatedAt.size());
+ assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(1, processor.punctuatedAt.size());
+ assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(1, processor.punctuatedAt.size());
+ assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(2, processor.punctuatedAt.size());
+ assertEquals(2, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(3, processor.punctuatedAt.size());
+ assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(3, processor.punctuatedAt.size());
+ assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(4, processor.punctuatedAt.size());
+ assertEquals(4, node.mockProcessor.punctuatedStreamTime.size());
}
@Test
public void testPunctuationIntervalCustomAlignment() {
- final TestProcessor processor = new TestProcessor();
- final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
- final PunctuationQueue queue = new PunctuationQueue();
- final Punctuator punctuator = new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- node.processor().punctuate(timestamp);
- }
- };
-
final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator);
final long now = sched.timestamp - 50L;
@@ -103,42 +91,32 @@ public class PunctuationQueueTest {
};
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(0, processor.punctuatedAt.size());
+ assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(0, processor.punctuatedAt.size());
+ assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(1, processor.punctuatedAt.size());
+ assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(1, processor.punctuatedAt.size());
+ assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(2, processor.punctuatedAt.size());
+ assertEquals(2, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(3, processor.punctuatedAt.size());
+ assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(3, processor.punctuatedAt.size());
+ assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(4, processor.punctuatedAt.size());
+ assertEquals(4, node.mockProcessor.punctuatedStreamTime.size());
}
@Test
public void testPunctuationIntervalCancelFromPunctuator() {
- final TestProcessor processor = new TestProcessor();
- final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
- final PunctuationQueue queue = new PunctuationQueue();
- final Punctuator punctuator = new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- node.processor().punctuate(timestamp);
- }
- };
-
final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
final long now = sched.timestamp - 100L;
@@ -154,35 +132,25 @@ public class PunctuationQueueTest {
};
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(0, processor.punctuatedAt.size());
+ assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(1, processor.punctuatedAt.size());
+ assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
- assertEquals(1, processor.punctuatedAt.size());
+ assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
}
private static class TestProcessor extends AbstractProcessor<String, String> {
- public final ArrayList<Long> punctuatedAt = new ArrayList<>();
-
@Override
- public void init(ProcessorContext context) {
- }
+ public void init(ProcessorContext context) {}
@Override
- public void process(String key, String value) {
- }
+ public void process(String key, String value) {}
@Override
- public void punctuate(long streamTime) {
- punctuatedAt.add(streamTime);
- }
-
- @Override
- public void close() {
- }
+ public void close() {}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 598e47e..3a0fc4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -393,7 +393,7 @@ public class StreamTaskTest {
assertFalse(task.process());
assertFalse(task.maybePunctuateStreamTime());
- processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
+ processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
}
@SuppressWarnings("unchecked")
@@ -479,7 +479,7 @@ public class StreamTaskTest {
assertFalse(task.process());
assertFalse(task.maybePunctuateStreamTime());
- processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
+ processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
}
@SuppressWarnings("unchecked")
@@ -509,11 +509,11 @@ public class StreamTaskTest {
assertTrue(task.process());
- processorStreamTime.supplier.scheduleCancellable.cancel();
+ processorStreamTime.mockProcessor.scheduleCancellable.cancel();
assertFalse(task.maybePunctuateStreamTime());
- processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
+ processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
}
@Test
@@ -533,7 +533,7 @@ public class StreamTaskTest {
time.sleep(20);
assertTrue(task.maybePunctuateSystemTime());
assertFalse(task.maybePunctuateSystemTime());
- processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50);
+ processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50);
}
@Test
@@ -544,7 +544,7 @@ public class StreamTaskTest {
assertFalse(task.maybePunctuateSystemTime());
time.sleep(9);
assertFalse(task.maybePunctuateSystemTime());
- processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
+ processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
}
@Test
@@ -570,7 +570,7 @@ public class StreamTaskTest {
time.sleep(5); // punctuate at now + 240, still aligned on the initial punctuation
assertTrue(task.maybePunctuateSystemTime());
assertFalse(task.maybePunctuateSystemTime());
- processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
+ processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
}
@Test
@@ -581,10 +581,10 @@ public class StreamTaskTest {
final long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
- processorSystemTime.supplier.scheduleCancellable.cancel();
+ processorSystemTime.mockProcessor.scheduleCancellable.cancel();
time.sleep(10);
assertFalse(task.maybePunctuateSystemTime());
- processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10);
+ processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3ae7acb..5bc1934 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -894,10 +894,6 @@ public class StreamThreadTest {
@Override
public void process(final Object key, final Object value) {}
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(final long timestamp) {}
-
@Override
public void close() {}
};
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 5e61910..27a0094 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -198,9 +198,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
}
@Override
- public void schedule(final long interval) { }
-
- @Override
public void commit() { }
@Override
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index c93a306..cf4460d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -205,21 +205,6 @@ public class KStreamTestDriver extends ExternalResource {
return topicNode;
}
- public void punctuate(final long timestamp) {
- final ProcessorNode prevNode = context.currentNode();
- for (final ProcessorNode processor : topology.processors()) {
- if (processor.processor() != null) {
- context.setRecordContext(createRecordContext(context.topic(), timestamp));
- context.setCurrentNode(processor);
- try {
- processor.processor().punctuate(timestamp);
- } finally {
- context.setCurrentNode(prevNode);
- }
- }
- }
- }
-
public void setTime(final long timestamp) {
context.setTime(timestamp);
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
similarity index 53%
copy from streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
copy to streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index bdc8d40..927be0b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -18,9 +18,7 @@ package org.apache.kafka.test;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -28,7 +26,7 @@ import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
-public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
+public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public final ArrayList<String> processed = new ArrayList<>();
public final ArrayList<K> processedKeys = new ArrayList<>();
@@ -37,67 +35,50 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
- private final long scheduleInterval;
- private final PunctuationType punctuationType;
public Cancellable scheduleCancellable;
- public MockProcessorSupplier() {
- this(-1L);
- }
-
- public MockProcessorSupplier(long scheduleInterval) {
- this(scheduleInterval, PunctuationType.STREAM_TIME);
- }
+ private final PunctuationType punctuationType;
+ private final long scheduleInterval;
- public MockProcessorSupplier(long scheduleInterval, PunctuationType punctuationType) {
- this.scheduleInterval = scheduleInterval;
+ public MockProcessor(final PunctuationType punctuationType, final long scheduleInterval) {
this.punctuationType = punctuationType;
+ this.scheduleInterval = scheduleInterval;
}
- @Override
- public Processor<K, V> get() {
- return new MockProcessor(punctuationType);
+ public MockProcessor() {
+ this(PunctuationType.STREAM_TIME, -1);
}
- public class MockProcessor extends AbstractProcessor<K, V> {
-
- PunctuationType punctuationType;
-
- public MockProcessor(PunctuationType punctuationType) {
- this.punctuationType = punctuationType;
- }
-
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
- if (scheduleInterval > 0L) {
- scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- if (punctuationType == PunctuationType.STREAM_TIME) {
- assertEquals(timestamp, context().timestamp());
- }
- assertEquals(-1, context().partition());
- assertEquals(-1L, context().offset());
-
- (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
- .add(timestamp);
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ if (scheduleInterval > 0L) {
+ scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() {
+ @Override
+ public void punctuate(final long timestamp) {
+ if (punctuationType == PunctuationType.STREAM_TIME) {
+ assertEquals(timestamp, context().timestamp());
}
- });
- }
+ assertEquals(-1, context().partition());
+ assertEquals(-1L, context().offset());
+
+ (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
+ .add(timestamp);
+ }
+ });
}
+ }
- @Override
- public void process(K key, V value) {
- processedKeys.add(key);
- processedValues.add(value);
- processed.add((key == null ? "null" : key) + ":" +
- (value == null ? "null" : value));
+ @Override
+ public void process(final K key, final V value) {
+ processedKeys.add(key);
+ processedValues.add(value);
+ processed.add((key == null ? "null" : key) + ":" +
+ (value == null ? "null" : value));
- }
}
- public void checkAndClearProcessResult(String... expected) {
+ public void checkAndClearProcessResult(final String... expected) {
assertEquals("the number of outputs:" + processed, expected.length, processed.size());
for (int i = 0; i < expected.length; i++) {
assertEquals("output[" + i + "]:", expected[i], processed.get(i));
@@ -107,13 +88,12 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
}
public void checkEmptyAndClearProcessResult() {
-
assertEquals("the number of outputs:", 0, processed.size());
processed.clear();
}
- public void checkAndClearPunctuateResult(PunctuationType type, long... expected) {
- ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
+ public void checkAndClearPunctuateResult(final PunctuationType type, final long... expected) {
+ final ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
assertEquals("the number of outputs:", expected.length, punctuated.size());
for (int i = 0; i < expected.length; i++) {
@@ -122,5 +102,4 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
processed.clear();
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index a526bfd..094cb03 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -17,7 +17,6 @@
package org.apache.kafka.test;
import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
@@ -29,9 +28,9 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
private static final String NAME = "MOCK-PROCESS-";
private static final AtomicInteger INDEX = new AtomicInteger(1);
- public final MockProcessorSupplier<K, V> supplier;
+ public final MockProcessor<K, V> mockProcessor;
+
public boolean closed;
- public long punctuatedAt;
public boolean initialized;
public MockProcessorNode(long scheduleInterval) {
@@ -39,13 +38,17 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
}
public MockProcessorNode(long scheduleInterval, PunctuationType punctuationType) {
- this(new MockProcessorSupplier<K, V>(scheduleInterval, punctuationType));
+ this(new MockProcessor<K, V>(punctuationType, scheduleInterval));
+ }
+
+ public MockProcessorNode() {
+ this(new MockProcessor<K, V>());
}
- private MockProcessorNode(MockProcessorSupplier<K, V> supplier) {
- super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.<String>emptySet());
+ private MockProcessorNode(final MockProcessor<K, V> mockProcessor) {
+ super(NAME + INDEX.getAndIncrement(), mockProcessor, Collections.<String>emptySet());
- this.supplier = supplier;
+ this.mockProcessor = mockProcessor;
}
@Override
@@ -60,12 +63,6 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
}
@Override
- public void punctuate(final long timestamp, final Punctuator punctuator) {
- super.punctuate(timestamp, punctuator);
- this.punctuatedAt = timestamp;
- }
-
- @Override
public void close() {
super.close();
this.closed = true;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index bdc8d40..aec47a4 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -16,30 +16,20 @@
*/
package org.apache.kafka.test;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
import java.util.ArrayList;
+import java.util.List;
import static org.junit.Assert.assertEquals;
public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
- public final ArrayList<String> processed = new ArrayList<>();
- public final ArrayList<K> processedKeys = new ArrayList<>();
- public final ArrayList<V> processedValues = new ArrayList<>();
-
- public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
- public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
-
private final long scheduleInterval;
private final PunctuationType punctuationType;
- public Cancellable scheduleCancellable;
+ private final List<MockProcessor<K, V>> processors = new ArrayList<>();
public MockProcessorSupplier() {
this(-1L);
@@ -56,71 +46,20 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
@Override
public Processor<K, V> get() {
- return new MockProcessor(punctuationType);
- }
-
- public class MockProcessor extends AbstractProcessor<K, V> {
-
- PunctuationType punctuationType;
-
- public MockProcessor(PunctuationType punctuationType) {
- this.punctuationType = punctuationType;
- }
-
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
- if (scheduleInterval > 0L) {
- scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() {
- @Override
- public void punctuate(long timestamp) {
- if (punctuationType == PunctuationType.STREAM_TIME) {
- assertEquals(timestamp, context().timestamp());
- }
- assertEquals(-1, context().partition());
- assertEquals(-1L, context().offset());
-
- (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
- .add(timestamp);
- }
- });
- }
- }
-
- @Override
- public void process(K key, V value) {
- processedKeys.add(key);
- processedValues.add(value);
- processed.add((key == null ? "null" : key) + ":" +
- (value == null ? "null" : value));
-
- }
- }
-
- public void checkAndClearProcessResult(String... expected) {
- assertEquals("the number of outputs:" + processed, expected.length, processed.size());
- for (int i = 0; i < expected.length; i++) {
- assertEquals("output[" + i + "]:", expected[i], processed.get(i));
- }
-
- processed.clear();
+ final MockProcessor<K, V> processor = new MockProcessor<>(punctuationType, scheduleInterval);
+ processors.add(processor);
+ return processor;
}
- public void checkEmptyAndClearProcessResult() {
-
- assertEquals("the number of outputs:", 0, processed.size());
- processed.clear();
+ // get the captured processor assuming that only one processor gets returned from this supplier
+ public MockProcessor<K, V> theCapturedProcessor() {
+ return capturedProcessors(1).get(0);
}
- public void checkAndClearPunctuateResult(PunctuationType type, long... expected) {
- ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
- assertEquals("the number of outputs:", expected.length, punctuated.size());
+ // get the captured processors with the expected number
+ public List<MockProcessor<K, V>> capturedProcessors(final int expectedNumberOfProcessors) {
+ assertEquals(expectedNumberOfProcessors, processors.size());
- for (int i = 0; i < expected.length; i++) {
- assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i));
- }
-
- processed.clear();
+ return processors;
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 6b5d47a..e931c7e 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -57,10 +57,6 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
}
@Override
- public void schedule(final long interval) {
- }
-
- @Override
public <K, V> void forward(final K key, final V value) {
forwardedValues.put(key, value);
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 8f0ae93..4cee0ac 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -21,7 +21,6 @@ package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
-import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 94c36ad..d3ccaec 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -284,14 +284,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
override def init(context: ProcessorContext): Unit = transformerSupplier.init(context)
- @deprecated ("Please use Punctuator functional interface at https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/Punctuator.html instead", "0.1.3") // scalastyle:ignore
- override def punctuate(timestamp: Long): KeyValue[K1, V1] = {
- transformerSupplier.punctuate(timestamp) match {
- case (k1, v1) => KeyValue.pair[K1, V1](k1, v1)
- case _ => null
- }
- }
-
override def close(): Unit = transformerSupplier.close()
}
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 1aa1978..226192f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -23,7 +23,6 @@ package kstream
import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
import org.apache.kafka.streams.state.WindowStore
import org.apache.kafka.common.utils.Bytes
-import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 15b2da6..c387c36 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -361,14 +361,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
};
}
- @Override
- public void schedule(final long interval) {
- throw new UnsupportedOperationException(
- "schedule() is deprecated and not supported in Mock. " +
- "Use schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) instead."
- );
- }
-
/**
* Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
*
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 8c5ec46..934e043 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -361,10 +361,6 @@ public class MockProcessorContextTest {
}
@Override
- public void punctuate(final long timestamp) {
- }
-
- @Override
public void close() {
}
};
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 077b8ca..5259ef2 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -185,10 +185,6 @@ public class TopologyTestDriverTest {
private boolean closed = false;
private final List<Record> processedRecords = new ArrayList<>();
- MockProcessor() {
- this(Collections.<Punctuation>emptySet());
- }
-
MockProcessor(final Collection<Punctuation> punctuations) {
this.punctuations = punctuations;
}
@@ -208,10 +204,6 @@ public class TopologyTestDriverTest {
context.forward(key, value);
}
- @SuppressWarnings("deprecation")
- @Override
- public void punctuate(long timestamp) {} // deprecated
-
@Override
public void close() {
closed = true;
@@ -840,9 +832,6 @@ public class TopologyTestDriverTest {
}
@Override
- public void punctuate(final long timestamp) {}
-
- @Override
public void close() {}
}
@@ -870,9 +859,6 @@ public class TopologyTestDriverTest {
}
@Override
- public void punctuate(final long timestamp) {}
-
- @Override
public void close() {}
};
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.