You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/06 22:17:32 UTC
[kafka] branch 2.0 updated: KAFKA-6967: TopologyTestDriver does not
allow pre-populating state stores that have change logging (#5096)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new a0dd05c KAFKA-6967: TopologyTestDriver does not allow pre-populating state stores that have change logging (#5096)
a0dd05c is described below
commit a0dd05c2c7947d18e71e2ea1bb83683509fe57fe
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Jun 6 15:16:21 2018 -0700
KAFKA-6967: TopologyTestDriver does not allow pre-populating state stores that have change logging (#5096)
Reviewers: Guozhang Wang <gu...@confluent.io>, James Cheng <jy...@yahoo.com>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
build.gradle | 1 +
.../kstream/internals/KGroupedTableImplTest.java | 4 +-
.../apache/kafka/streams/TopologyTestDriver.java | 69 +++++------
.../streams/processor/MockProcessorContext.java | 35 +++++-
.../kafka/streams/test/ConsumerRecordFactory.java | 31 ++++-
.../apache/kafka/streams/test/OutputVerifier.java | 14 +++
.../kafka/streams/MockProcessorContextTest.java | 11 +-
.../kafka/streams/TopologyTestDriverTest.java | 133 ++++++++++++---------
8 files changed, 190 insertions(+), 108 deletions(-)
diff --git a/build.gradle b/build.gradle
index 4f3fd77..14479f1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1017,6 +1017,7 @@ project(':streams:test-utils') {
testCompile project(':clients').sourceSets.test.output
testCompile libs.junit
+ testCompile libs.easymock
testRuntime libs.slf4jlog4j
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 4b8298f..79e0b42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -199,7 +199,7 @@ public class KGroupedTableImplTest {
final Map<String, Integer> results = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
assertReduced(results, topic, driver);
- final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
+ final KeyValueStore<String, Integer> reduce = driver.getKeyValueStore("reduce");
assertThat(reduce.get("A"), equalTo(5));
assertThat(reduce.get("B"), equalTo(6));
}
@@ -240,7 +240,7 @@ public class KGroupedTableImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(topic, driver);
- final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
+ final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
assertThat(aggregate.get("2"), equalTo("0+2+2"));
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e46ec6a..773cbb4 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -170,7 +171,7 @@ import java.util.regex.Pattern;
@InterfaceStability.Evolving
public class TopologyTestDriver implements Closeable {
- private final Time mockTime;
+ private final Time mockWallClockTime;
private final InternalTopologyBuilder internalTopologyBuilder;
private final static int PARTITION_ID = 0;
@@ -179,6 +180,8 @@ public class TopologyTestDriver implements Closeable {
private final GlobalStateUpdateTask globalStateTask;
private final GlobalStateManager globalStateManager;
+ private final InternalProcessorContext context;
+
private final StateDirectory stateDirectory;
private final Metrics metrics;
private final ProcessorTopology processorTopology;
@@ -216,7 +219,6 @@ public class TopologyTestDriver implements Closeable {
public TopologyTestDriver(final Topology topology,
final Properties config,
final long initialWallClockTimeMs) {
-
this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
}
@@ -225,25 +227,13 @@ public class TopologyTestDriver implements Closeable {
*
* @param builder builder for the topology to be tested
* @param config the configuration for the topology
- */
- TopologyTestDriver(final InternalTopologyBuilder builder,
- final Properties config) {
- this(builder, config, System.currentTimeMillis());
-
- }
-
- /**
- * Create a new test diver instance.
- *
- * @param builder builder for the topology to be tested
- * @param config the configuration for the topology
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
*/
private TopologyTestDriver(final InternalTopologyBuilder builder,
- final Properties config,
- final long initialWallClockTimeMs) {
+ final Properties config,
+ final long initialWallClockTimeMs) {
final StreamsConfig streamsConfig = new StreamsConfig(config);
- mockTime = new MockTime(initialWallClockTimeMs);
+ mockWallClockTime = new MockTime(initialWallClockTimeMs);
internalTopologyBuilder = builder;
internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
@@ -260,7 +250,7 @@ public class TopologyTestDriver implements Closeable {
};
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- stateDirectory = new StateDirectory(streamsConfig, mockTime);
+ stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
@@ -323,6 +313,7 @@ public class TopologyTestDriver implements Closeable {
new LogContext()
);
globalStateTask.initialize();
+ globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
} else {
globalStateManager = null;
globalStateTask = null;
@@ -342,12 +333,15 @@ public class TopologyTestDriver implements Closeable {
streamsMetrics,
stateDirectory,
cache,
- mockTime,
+ mockWallClockTime,
producer);
task.initializeStateStores();
task.initializeTopology();
+ context = (InternalProcessorContext) task.context();
+ context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
} else {
task = null;
+ context = null;
}
}
@@ -356,6 +350,7 @@ public class TopologyTestDriver implements Closeable {
*
* @return Map of all metrics.
*/
+ @SuppressWarnings("WeakerAccess")
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(metrics.metrics());
}
@@ -390,13 +385,10 @@ public class TopologyTestDriver implements Closeable {
consumerRecord.headers())));
// Process the record ...
- ((InternalProcessorContext) task.context()).setRecordContext(
- new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName, consumerRecord.headers()));
task.process();
task.maybePunctuateStreamTime();
task.commit();
captureOutputRecords();
-
} else {
final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
if (globalTopicPartition == null) {
@@ -446,12 +438,7 @@ public class TopologyTestDriver implements Closeable {
final List<ProducerRecord<byte[], byte[]>> output = producer.history();
producer.clear();
for (final ProducerRecord<byte[], byte[]> record : output) {
- Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
- if (outputRecords == null) {
- outputRecords = new LinkedList<>();
- outputRecordsByTopic.put(record.topic(), outputRecords);
- }
- outputRecords.add(record);
+ outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new LinkedList<>()).add(record);
// Forward back into the topology if the produced record is to an internal or a source topic ...
final String outputTopicName = record.topic();
@@ -497,7 +484,7 @@ public class TopologyTestDriver implements Closeable {
*/
@SuppressWarnings("WeakerAccess")
public void advanceWallClockTime(final long advanceMs) {
- mockTime.sleep(advanceMs);
+ mockWallClockTime.sleep(advanceMs);
if (task != null) {
task.maybePunctuateSystemTime();
task.commit();
@@ -549,6 +536,8 @@ public class TopologyTestDriver implements Closeable {
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ * <p>
+ * Note, that {@code StateStore} might be {@code null} if a store is added but not connected to any processor.
*
* @return all stores my name
* @see #getStateStore(String)
@@ -579,13 +568,24 @@ public class TopologyTestDriver implements Closeable {
* @see #getWindowStore(String)
* @see #getSessionStore(String)
*/
+ @SuppressWarnings("WeakerAccess")
public StateStore getStateStore(final String name) {
- StateStore stateStore = task == null ? null :
- ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
- if (stateStore == null && globalStateManager != null) {
- stateStore = globalStateManager.getGlobalStore(name);
+ if (task != null) {
+ final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+ if (stateStore != null) {
+ return stateStore;
+ }
}
- return stateStore;
+
+ if (globalStateManager != null) {
+ final StateStore stateStore = globalStateManager.getGlobalStore(name);
+ if (stateStore != null) {
+ return stateStore;
+ }
+
+ }
+
+ return null;
}
/**
@@ -651,6 +651,7 @@ public class TopologyTestDriver implements Closeable {
/**
* Close the driver, its topology, and all processors.
*/
+ @SuppressWarnings("WeakerAccess")
public void close() {
if (task != null) {
task.close(true, false);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index b14a791..cba0257 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -86,23 +86,27 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
this.punctuator = punctuator;
}
+ @SuppressWarnings({"WeakerAccess", "unused"})
public long getIntervalMs() {
return intervalMs;
}
+ @SuppressWarnings({"WeakerAccess", "unused"})
public PunctuationType getType() {
return type;
}
+ @SuppressWarnings({"WeakerAccess", "unused"})
public Punctuator getPunctuator() {
return punctuator;
}
- @SuppressWarnings("WeakerAccess")
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void cancel() {
cancelled = true;
}
+ @SuppressWarnings({"WeakerAccess", "unused"})
public boolean cancelled() {
return cancelled;
}
@@ -127,6 +131,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @return The child name, or {@code null} if it was broadcast.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public String childName() {
return childName;
}
@@ -136,6 +141,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @return A timestamp, or {@code -1} if none was forwarded.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public long timestamp() {
return timestamp;
}
@@ -145,6 +151,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @return A key/value pair. Not null.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public KeyValue keyValue() {
return keyValue;
}
@@ -158,6 +165,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
* and most unit tests should be able to get by with the
* {@link InMemoryKeyValueStore}, so the stateDir won't matter.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext() {
//noinspection DoubleBraceInitialization
this(
@@ -179,6 +187,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @param config a Properties object, used to configure the context and the processor.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config) {
this(config, new TaskId(0, 0), null);
}
@@ -190,6 +199,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
* @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
* @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
final StreamsConfig streamsConfig = new StreamsConfig(config);
this.taskId = taskId;
@@ -252,6 +262,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
* @param offset A record offset
* @param timestamp A record timestamp
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp) {
this.topic = topic;
this.partition = partition;
@@ -260,13 +271,13 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
this.timestamp = timestamp;
}
-
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
*
* @param topic A topic name
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void setTopic(final String topic) {
this.topic = topic;
}
@@ -277,21 +288,29 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @param partition A partition number
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void setPartition(final int partition) {
this.partition = partition;
}
-
/**
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
*
* @param offset A record offset
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void setOffset(final long offset) {
this.offset = offset;
}
+ /**
+ * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+ * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+ *
+ * @param headers Record headers
+ */
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void setHeaders(final Headers headers) {
this.headers = headers;
}
@@ -302,6 +321,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @param timestamp A record timestamp
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
@@ -345,7 +365,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
// mocks ================================================
-
@Override
public void register(final StateStore store,
final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
@@ -376,6 +395,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @return A list of captured punctuators.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<CapturedPunctuator> scheduledPunctuators() {
final LinkedList<CapturedPunctuator> capturedPunctuators = new LinkedList<>();
capturedPunctuators.addAll(punctuators);
@@ -394,6 +414,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
capturedForwards.add(new CapturedForward(to, new KeyValue(key, value)));
}
+ @SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException(
@@ -402,6 +423,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
);
}
+ @SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException(
@@ -417,6 +439,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @return A list of key/value pairs that were previously passed to the context.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<CapturedForward> forwarded() {
final LinkedList<CapturedForward> result = new LinkedList<>();
result.addAll(capturedForwards);
@@ -431,6 +454,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
* @param childName The child name to retrieve forwards for
* @return A list of key/value pairs that were previously passed to the context.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<CapturedForward> forwarded(final String childName) {
final LinkedList<CapturedForward> result = new LinkedList<>();
for (final CapturedForward capture : capturedForwards) {
@@ -444,6 +468,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
/**
* Clear the captured forwarded data.
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void resetForwards() {
capturedForwards.clear();
}
@@ -458,6 +483,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*
* @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
*/
+ @SuppressWarnings("WeakerAccess")
public boolean committed() {
return committed;
}
@@ -465,6 +491,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
/**
* Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void resetCommit() {
committed = false;
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index 507249d..108dafd 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -44,7 +44,7 @@ public class ConsumerRecordFactory<K, V> {
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private long timeMs;
- private long advanceMs;
+ private final long advanceMs;
/**
* Create a new factory for the given topic.
@@ -54,6 +54,7 @@ public class ConsumerRecordFactory<K, V> {
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(null, keySerializer, valueSerializer, System.currentTimeMillis());
@@ -68,6 +69,7 @@ public class ConsumerRecordFactory<K, V> {
* @param keySerializer the key serializer
* @param valueSerializer the value serializer
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final String defaultTopicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
@@ -82,6 +84,7 @@ public class ConsumerRecordFactory<K, V> {
* @param valueSerializer the value serializer
* @param startTimestampMs the initial timestamp for generated records
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final long startTimestampMs) {
@@ -97,6 +100,7 @@ public class ConsumerRecordFactory<K, V> {
* @param valueSerializer the value serializer
* @param startTimestampMs the initial timestamp for generated records
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final String defaultTopicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
@@ -112,6 +116,7 @@ public class ConsumerRecordFactory<K, V> {
* @param startTimestampMs the initial timestamp for generated records
* @param autoAdvanceMs the time increment pre generated record
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final long startTimestampMs,
@@ -128,6 +133,7 @@ public class ConsumerRecordFactory<K, V> {
* @param startTimestampMs the initial timestamp for generated records
* @param autoAdvanceMs the time increment pre generated record
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecordFactory(final String defaultTopicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
@@ -147,6 +153,7 @@ public class ConsumerRecordFactory<K, V> {
*
* @param advanceMs the amount of time to advance
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public void advanceTimeMs(final long advanceMs) {
if (advanceMs < 0) {
throw new IllegalArgumentException("advanceMs must be positive");
@@ -165,6 +172,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value,
@@ -198,6 +206,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value,
@@ -214,6 +223,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value,
final long timestampMs) {
@@ -230,6 +240,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value,
final Headers headers,
@@ -250,6 +261,7 @@ public class ConsumerRecordFactory<K, V> {
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value) {
@@ -268,6 +280,7 @@ public class ConsumerRecordFactory<K, V> {
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final K key,
final V value,
@@ -285,6 +298,7 @@ public class ConsumerRecordFactory<K, V> {
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value) {
return create(key, value, new RecordHeaders());
@@ -299,6 +313,7 @@ public class ConsumerRecordFactory<K, V> {
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final K key,
final V value,
final Headers headers) {
@@ -318,6 +333,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value,
final long timestampMs) {
@@ -334,6 +350,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value,
final Headers headers,
@@ -349,6 +366,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value,
final long timestampMs) {
return create(value, new RecordHeaders(), timestampMs);
@@ -363,6 +381,7 @@ public class ConsumerRecordFactory<K, V> {
* @param timestampMs the record timestamp
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value,
final Headers headers,
final long timestampMs) {
@@ -382,6 +401,7 @@ public class ConsumerRecordFactory<K, V> {
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value,
final Headers headers) {
@@ -396,6 +416,7 @@ public class ConsumerRecordFactory<K, V> {
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final String topicName,
final V value) {
return create(topicName, null, value, new RecordHeaders());
@@ -408,6 +429,7 @@ public class ConsumerRecordFactory<K, V> {
* @param value the record value
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value) {
return create(value, new RecordHeaders());
}
@@ -420,6 +442,7 @@ public class ConsumerRecordFactory<K, V> {
* @param headers the record headers
* @return the generated {@link ConsumerRecord}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public ConsumerRecord<byte[], byte[]> create(final V value,
final Headers headers) {
if (topicName == null) {
@@ -437,6 +460,7 @@ public class ConsumerRecordFactory<K, V> {
* @param keyValues the record keys and values
* @return the generated {@link ConsumerRecord consumer records}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
final List<KeyValue<K, V>> keyValues) {
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
@@ -455,6 +479,7 @@ public class ConsumerRecordFactory<K, V> {
* @param keyValues the record keys and values
* @return the generated {@link ConsumerRecord consumer records}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues) {
if (topicName == null) {
throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
@@ -474,6 +499,7 @@ public class ConsumerRecordFactory<K, V> {
* @param advanceMs the time difference between two consecutive generated records
* @return the generated {@link ConsumerRecord consumer records}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
final List<KeyValue<K, V>> keyValues,
final long startTimestamp,
@@ -502,6 +528,7 @@ public class ConsumerRecordFactory<K, V> {
* @param advanceMs the time difference between two consecutive generated records
* @return the generated {@link ConsumerRecord consumer records}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
final long startTimestamp,
final long advanceMs) {
@@ -523,6 +550,7 @@ public class ConsumerRecordFactory<K, V> {
* @param startTimestamp the timestamp for the first generated record
* @return the generated {@link ConsumerRecord consumer records}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
final List<KeyValue<K, V>> keyValues,
final long startTimestamp) {
@@ -538,6 +566,7 @@ public class ConsumerRecordFactory<K, V> {
* @param startTimestamp the timestamp for the first generated record
* @return the generated {@link ConsumerRecord consumer records}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
final long startTimestamp) {
if (topicName == null) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
index aedb910..f78e926 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -39,6 +39,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedValue}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValue(final ProducerRecord<K, V> record,
final V expectedValue) throws AssertionError {
Objects.requireNonNull(record);
@@ -65,6 +66,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedRecord}'s value
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValue(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
@@ -82,6 +84,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedKey} or {@code expectedValue}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue) throws AssertionError {
@@ -119,6 +122,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedRecord}'s key or value
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
@@ -136,6 +140,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedValue} or {@code expectedTimestamp}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
final V expectedValue,
final long expectedTimestamp) throws AssertionError {
@@ -169,6 +174,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedRecord}'s value or timestamp
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
@@ -189,6 +195,7 @@ public class OutputVerifier {
* @throws AssertionError if {@code ProducerRecord}'s key, value, timestamp is not equal to {@code expectedKey},
* {@code expectedValue}, or {@code expectedTimestamps}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
@@ -233,6 +240,7 @@ public class OutputVerifier {
* @throws AssertionError if {@code ProducerRecord}'s key, value, or timestamp is not equal to
* {@code expectedRecord}'s key, value, or timestamp
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
@@ -250,6 +258,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedValue} or {@code expectedHeaders}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
final V expectedValue,
final Headers expectedHeaders) throws AssertionError {
@@ -287,6 +296,7 @@ public class OutputVerifier {
* @param <V> the value type
* @throws AssertionError if {@code ProducerRecord}'s value or headers is not equal to {@code expectedRecord}'s value or headers
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareValueHeaders(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
@@ -307,6 +317,7 @@ public class OutputVerifier {
* @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
* {@code expectedValue}, or {@code expectedHeaders}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
@@ -355,6 +366,7 @@ public class OutputVerifier {
* @throws AssertionError if {@code ProducerRecord}'s key, value, or headers is not equal to
* {@code expectedRecord}'s key, value, or headers
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeaders(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
@@ -376,6 +388,7 @@ public class OutputVerifier {
* @throws AssertionError if {@code ProducerRecord}'s key, value, headers is not equal to {@code expectedKey},
* {@code expectedValue}, or {@code expectedHeaders}
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
@@ -432,6 +445,7 @@ public class OutputVerifier {
* @throws AssertionError if {@code ProducerRecord}'s key, value, headers, or timestamp is not equal to
* {@code expectedRecord}'s key, value, headers, or timestamp
*/
+ @SuppressWarnings({"WeakerAccess", "unused"})
public static <K, V> void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record,
final ProducerRecord<K, V> expectedRecord) throws AssertionError {
Objects.requireNonNull(expectedRecord);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 64d5b12..878aa35 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -156,9 +156,9 @@ public class MockProcessorContextTest {
@Test
public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+ @SuppressWarnings("deprecation")
@Override
public void process(final String key, final Long value) {
- //noinspection deprecation
context().forward(key, value, 0);
}
};
@@ -178,9 +178,9 @@ public class MockProcessorContextTest {
@Test
public void shouldThrowIfForwardedWithDeprecatedChildName() {
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
+ @SuppressWarnings("deprecation")
@Override
public void process(final String key, final Long value) {
- //noinspection deprecation
context().forward(key, value, "child1");
}
};
@@ -347,12 +347,7 @@ public class MockProcessorContextTest {
context.schedule(
1000L,
PunctuationType.WALL_CLOCK_TIME,
- new Punctuator() {
- @Override
- public void punctuate(final long timestamp) {
- context.commit();
- }
- }
+ timestamp -> context.commit()
);
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 2d446d1..7552637 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -31,14 +31,15 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -64,6 +65,7 @@ import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -100,26 +102,27 @@ public class TopologyTestDriverTest {
};
private KeyValueStore<String, Long> store;
- private StringDeserializer stringDeserializer = new StringDeserializer();
- private LongDeserializer longDeserializer = new LongDeserializer();
- private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(
+ private final StringDeserializer stringDeserializer = new StringDeserializer();
+ private final LongDeserializer longDeserializer = new LongDeserializer();
+ private final ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(
new StringSerializer(),
new LongSerializer());
private final static class Record {
- private Object key;
- private Object value;
- private long timestamp;
- private long offset;
- private String topic;
- private Headers headers;
-
- Record(final ConsumerRecord consumerRecord) {
+ private final Object key;
+ private final Object value;
+ private final long timestamp;
+ private final long offset;
+ private final String topic;
+ private final Headers headers;
+
+ Record(final ConsumerRecord consumerRecord,
+ final long newOffset) {
key = consumerRecord.key();
value = consumerRecord.value();
timestamp = consumerRecord.timestamp();
- offset = consumerRecord.offset();
+ offset = newOffset;
topic = consumerRecord.topic();
headers = consumerRecord.headers();
}
@@ -184,7 +187,7 @@ public class TopologyTestDriverTest {
private final List<Long> punctuatedAt = new LinkedList<>();
@Override
- public void punctuate(long timestamp) {
+ public void punctuate(final long timestamp) {
punctuatedAt.add(timestamp);
}
}
@@ -202,7 +205,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
initialized = true;
this.context = context;
for (final Punctuation punctuation : punctuations) {
@@ -211,7 +214,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void process(Object key, Object value) {
+ public void process(final Object key, final Object value) {
processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic()));
context.forward(key, value);
}
@@ -228,7 +231,7 @@ public class TopologyTestDriverTest {
private final Collection<Punctuation> punctuations;
private MockProcessorSupplier() {
- this(Collections.<Punctuation>emptySet());
+ this(Collections.emptySet());
}
private MockProcessorSupplier(final Collection<Punctuation> punctuations) {
@@ -391,8 +394,7 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords.size());
final Record record = processedRecords.get(0);
- final Record expectedResult = new Record(consumerRecord1);
- expectedResult.offset = 0L;
+ final Record expectedResult = new Record(consumerRecord1, 0L);
assertThat(record, equalTo(expectedResult));
}
@@ -410,8 +412,7 @@ public class TopologyTestDriverTest {
assertEquals(0, processedRecords2.size());
Record record = processedRecords1.get(0);
- Record expectedResult = new Record(consumerRecord1);
- expectedResult.offset = 0L;
+ Record expectedResult = new Record(consumerRecord1, 0L);
assertThat(record, equalTo(expectedResult));
testDriver.pipeInput(consumerRecord2);
@@ -420,8 +421,7 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords2.size());
record = processedRecords2.get(0);
- expectedResult = new Record(consumerRecord2);
- expectedResult.offset = 0L;
+ expectedResult = new Record(consumerRecord2, 0L);
assertThat(record, equalTo(expectedResult));
}
@@ -439,7 +439,7 @@ public class TopologyTestDriverTest {
topology.addSink(
"sink",
SINK_TOPIC_1,
- new Serializer() {
+ new Serializer<Object>() {
@Override
public byte[] serialize(final String topic, final Object data) {
if (data instanceof Long) {
@@ -452,7 +452,7 @@ public class TopologyTestDriverTest {
@Override
public void configure(final Map configs, final boolean isKey) {}
},
- new Serializer() {
+ new Serializer<Object>() {
@Override
public byte[] serialize(final String topic, final Object data) {
if (data instanceof String) {
@@ -560,13 +560,11 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords2.size());
Record record = processedRecords1.get(0);
- Record expectedResult = new Record(consumerRecord1);
- expectedResult.offset = 0L;
+ Record expectedResult = new Record(consumerRecord1, 0L);
assertThat(record, equalTo(expectedResult));
record = processedRecords2.get(0);
- expectedResult = new Record(consumerRecord2);
- expectedResult.offset = 0L;
+ expectedResult = new Record(consumerRecord2, 0L);
assertThat(record, equalTo(expectedResult));
}
@@ -601,8 +599,7 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords.size());
final Record record = processedRecords.get(0);
- final Record expectedResult = new Record(consumerRecord1);
- expectedResult.offset = 0L;
+ final Record expectedResult = new Record(consumerRecord1, 0L);
assertThat(record, equalTo(expectedResult));
}
@@ -687,13 +684,14 @@ public class TopologyTestDriverTest {
@Test
public void shouldReturnAllStores() {
final Topology topology = setupSourceSinkTopology();
+ topology.addProcessor("processor", () -> null);
topology.addStateStore(
new KeyValueStoreBuilder<>(
Stores.inMemoryKeyValueStore("store"),
Serdes.ByteArray(),
Serdes.ByteArray(),
- new SystemTime())
- .withLoggingDisabled());
+ new SystemTime()),
+ "processor");
topology.addGlobalStore(
new KeyValueStoreBuilder<>(
Stores.inMemoryKeyValueStore("globalStore"),
@@ -705,12 +703,41 @@ public class TopologyTestDriverTest {
Serdes.ByteArray().deserializer(),
"globalTopicName",
"globalProcessorName",
- new ProcessorSupplier() {
- @Override
- public Processor get() {
- return null;
- }
- });
+ () -> null);
+
+ testDriver = new TopologyTestDriver(topology, config);
+
+ final Set<String> expectedStoreNames = new HashSet<>();
+ expectedStoreNames.add("store");
+ expectedStoreNames.add("globalStore");
+ final Map<String, StateStore> allStores = testDriver.getAllStateStores();
+ assertThat(allStores.keySet(), equalTo(expectedStoreNames));
+ for (final StateStore store : allStores.values()) {
+ assertNotNull(store);
+ }
+ }
+
+ @Test
+ public void shouldReturnAllStoresNames() {
+ final Topology topology = setupSourceSinkTopology();
+ topology.addStateStore(
+ new KeyValueStoreBuilder<>(
+ Stores.inMemoryKeyValueStore("store"),
+ Serdes.ByteArray(),
+ Serdes.ByteArray(),
+ new SystemTime()));
+ topology.addGlobalStore(
+ new KeyValueStoreBuilder<>(
+ Stores.inMemoryKeyValueStore("globalStore"),
+ Serdes.ByteArray(),
+ Serdes.ByteArray(),
+ new SystemTime()).withLoggingDisabled(),
+ "sourceProcessorName",
+ Serdes.ByteArray().deserializer(),
+ Serdes.ByteArray().deserializer(),
+ "globalTopicName",
+ "globalProcessorName",
+ () -> null);
testDriver = new TopologyTestDriver(topology, config);
@@ -721,13 +748,13 @@ public class TopologyTestDriverTest {
}
private void setup() {
- Topology topology = new Topology();
+ final Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
topology.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("aggStore"),
Serdes.String(),
- Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
+ Serdes.Long()),
"aggregator");
topology.addSink("sinkProcessor", "result-topic", "aggregator");
@@ -812,18 +839,8 @@ public class TopologyTestDriverTest {
@Override
public void init(final ProcessorContext context) {
this.context = context;
- context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
- @Override
- public void punctuate(final long timestamp) {
- flushStore();
- }
- });
- context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
- @Override
- public void punctuate(final long timestamp) {
- flushStore();
- }
- });
+ context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, timestamp -> flushStore());
+ context.schedule(10000, PunctuationType.STREAM_TIME, timestamp -> flushStore());
store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
}
@@ -908,7 +925,7 @@ public class TopologyTestDriverTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.globalTable("topic",
Consumed.with(Serdes.String(), Serdes.String()),
- Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalStore"));
+ Materialized.as("globalStore"));
try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) {
final KeyValueStore<String, String> globalStore = testDriver.getKeyValueStore("globalStore");
Assert.assertNotNull(globalStore);
@@ -956,8 +973,7 @@ public class TopologyTestDriverTest {
assertEquals(0, processedRecords2.size());
final Record record1 = processedRecords1.get(0);
- final Record expectedResult1 = new Record(consumerRecord1);
- expectedResult1.offset = 0L;
+ final Record expectedResult1 = new Record(consumerRecord1, 0L);
assertThat(record1, equalTo(expectedResult1));
testDriver.pipeInput(consumerRecord2);
@@ -966,8 +982,7 @@ public class TopologyTestDriverTest {
assertEquals(1, processedRecords2.size());
final Record record2 = processedRecords2.get(0);
- final Record expectedResult2 = new Record(consumerRecord2);
- expectedResult2.offset = 0L;
+ final Record expectedResult2 = new Record(consumerRecord2, 0L);
assertThat(record2, equalTo(expectedResult2));
}
@@ -1004,7 +1019,7 @@ public class TopologyTestDriverTest {
try {
testDriver.pipeInput(consumerRecord1);
} catch (final TopologyException exception) {
- String str =
+ final String str =
String.format(
"Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for " +
"input record topic: %s and hence cannot process the message.",
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.