You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/09 23:29:33 UTC
[kafka] branch trunk updated: KAFKA-3625: TopologyTestDriver must
process output for wall-clock-time punctuations and on close() (#4502)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc54c0e KAFKA-3625: TopologyTestDriver must process output for wall-clock-time punctuations and on close() (#4502)
dc54c0e is described below
commit dc54c0e24b3f7ca27990b1d576b8b8fd6d740ca1
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Feb 9 15:29:21 2018 -0800
KAFKA-3625: TopologyTestDriver must process output for wall-clock-time punctuations and on close() (#4502)
Author: Matthias J. Sax <ma...@confluent.io>
Reviewer: Damian Guy <da...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../apache/kafka/streams/TopologyTestDriver.java | 64 +++++----
.../kafka/streams/TopologyTestDriverTest.java | 159 ++++++++++++++++++++-
2 files changed, 188 insertions(+), 35 deletions(-)
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index ff63554..a108f22 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -328,42 +328,14 @@ public class TopologyTestDriver {
consumerRecord.serializedValueSize(),
consumerRecord.key(),
consumerRecord.value())));
- producer.clear();
// Process the record ...
((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName));
task.process();
task.maybePunctuateStreamTime();
task.commit();
+ captureOutputRecords();
- // Capture all the records sent to the producer ...
- for (final ProducerRecord<byte[], byte[]> record : producer.history()) {
- Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
- if (outputRecords == null) {
- outputRecords = new LinkedList<>();
- outputRecordsByTopic.put(record.topic(), outputRecords);
- }
- outputRecords.add(record);
-
- // Forward back into the topology if the produced record is to an internal or a source topic ...
- final String outputTopicName = record.topic();
- if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) {
- final byte[] serializedKey = record.key();
- final byte[] serializedValue = record.value();
-
- pipeInput(new ConsumerRecord<>(
- outputTopicName,
- -1,
- -1L,
- record.timestamp(),
- TimestampType.CREATE_TIME,
- 0L,
- serializedKey == null ? 0 : serializedKey.length,
- serializedValue == null ? 0 : serializedValue.length,
- serializedKey,
- serializedValue));
- }
- }
} else {
final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
if (globalTopicPartition == null) {
@@ -385,6 +357,38 @@ public class TopologyTestDriver {
}
}
+ private void captureOutputRecords() {
+ // Capture all the records sent to the producer ...
+ final List<ProducerRecord<byte[], byte[]>> output = producer.history();
+ producer.clear();
+ for (final ProducerRecord<byte[], byte[]> record : output) {
+ Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
+ if (outputRecords == null) {
+ outputRecords = new LinkedList<>();
+ outputRecordsByTopic.put(record.topic(), outputRecords);
+ }
+ outputRecords.add(record);
+
+ // Forward back into the topology if the produced record is to an internal or a source topic ...
+ final String outputTopicName = record.topic();
+ if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) {
+ final byte[] serializedKey = record.key();
+ final byte[] serializedValue = record.value();
+
+ pipeInput(new ConsumerRecord<>(
+ outputTopicName,
+ -1,
+ -1L,
+ record.timestamp(),
+ TimestampType.CREATE_TIME,
+ 0L,
+ serializedKey == null ? 0 : serializedKey.length,
+ serializedValue == null ? 0 : serializedValue.length,
+ serializedKey,
+ serializedValue));
+ }
+ }
+ }
/**
* Send input messages to the topology and then commit each message individually.
*
@@ -407,6 +411,7 @@ public class TopologyTestDriver {
mockTime.sleep(advanceMs);
task.maybePunctuateSystemTime();
task.commit();
+ captureOutputRecords();
}
/**
@@ -558,6 +563,7 @@ public class TopologyTestDriver {
// ignore
}
}
+ captureOutputRecords();
}
static class MockTime implements Time {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 921f6d6..17d5e02 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -19,8 +19,12 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.processor.Processor;
@@ -28,12 +32,15 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
@@ -81,6 +88,14 @@ public class TopologyTestDriverTest {
put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
}
};
+ private KeyValueStore<String, Long> store;
+
+ private StringDeserializer stringDeserializer = new StringDeserializer();
+ private LongDeserializer longDeserializer = new LongDeserializer();
+ private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(
+ new StringSerializer(),
+ new LongSerializer());
+
private final static class Record {
private Object key;
@@ -223,7 +238,9 @@ public class TopologyTestDriverTest {
@After
public void tearDown() {
- testDriver.close();
+ if (testDriver != null) {
+ testDriver.close();
+ }
}
private Topology setupSourceSinkTopology() {
@@ -417,7 +434,7 @@ public class TopologyTestDriverTest {
SINK_TOPIC_1,
new Serializer() {
@Override
- public byte[] serialize(String topic, Object data) {
+ public byte[] serialize(final String topic, final Object data) {
if (data instanceof Long) {
return Serdes.Long().serializer().serialize(topic, (Long) data);
}
@@ -426,11 +443,11 @@ public class TopologyTestDriverTest {
@Override
public void close() {}
@Override
- public void configure(Map configs, boolean isKey) {}
+ public void configure(final Map configs, final boolean isKey) {}
},
new Serializer() {
@Override
- public byte[] serialize(String topic, Object data) {
+ public byte[] serialize(final String topic, final Object data) {
if (data instanceof String) {
return Serdes.String().serializer().serialize(topic, (String) data);
}
@@ -439,7 +456,7 @@ public class TopologyTestDriverTest {
@Override
public void close() {}
@Override
- public void configure(Map configs, boolean isKey) {}
+ public void configure(final Map configs, final boolean isKey) {}
},
processor);
@@ -476,7 +493,7 @@ public class TopologyTestDriverTest {
}
@Test
- public void shouldUseSinkeSpecificSerializers() {
+ public void shouldUseSinkSpecificSerializers() {
final Topology topology = new Topology();
final String sourceName1 = "source-1";
@@ -691,4 +708,134 @@ public class TopologyTestDriverTest {
expectedStoreNames.add("globalStore");
assertThat(testDriver.getAllStateStores().keySet(), equalTo(expectedStoreNames));
}
+
+ private void setup() {
+ Topology topology = new Topology();
+ topology.addSource("sourceProcessor", "input-topic");
+ topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
+ topology.addStateStore(Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("aggStore"),
+ Serdes.String(),
+ Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
+ "aggregator");
+ topology.addSink("sinkProcessor", "result-topic", "aggregator");
+
+ config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+ testDriver = new TopologyTestDriver(topology, config);
+
+ store = testDriver.getKeyValueStore("aggStore");
+ store.put("a", 21L);
+ }
+
+ @Test
+ public void shouldFlushStoreForFirstInput() {
+ setup();
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+ }
+
+ @Test
+ public void shouldNotUpdateStoreForSmallerValue() {
+ setup();
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ Assert.assertThat(store.get("a"), equalTo(21L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+ }
+
+ @Test
+ public void shouldNotUpdateStoreForLargerValue() {
+ setup();
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
+ Assert.assertThat(store.get("a"), equalTo(42L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+ }
+
+ @Test
+ public void shouldUpdateStoreForNewKey() {
+ setup();
+ testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L));
+ Assert.assertThat(store.get("b"), equalTo(21L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+ }
+
+ @Test
+ public void shouldPunctuateIfEvenTimeAdvances() {
+ setup();
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+
+ testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L));
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+ }
+
+ @Test
+ public void shouldPunctuateIfWallClockTimeAdvances() {
+ setup();
+ testDriver.advanceWallClockTime(60000);
+ OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
+ Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
+ }
+
+ private class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
+ @Override
+ public Processor<String, Long> get() {
+ return new CustomMaxAggregator();
+ }
+ }
+
+ private class CustomMaxAggregator implements Processor<String, Long> {
+ ProcessorContext context;
+ private KeyValueStore<String, Long> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+ context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+ @Override
+ public void punctuate(final long timestamp) {
+ flushStore();
+ }
+ });
+ context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
+ @Override
+ public void punctuate(final long timestamp) {
+ flushStore();
+ }
+ });
+ store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ final Long oldValue = store.get(key);
+ if (oldValue == null || value > oldValue) {
+ store.put(key, value);
+ }
+ }
+
+ private void flushStore() {
+ final KeyValueIterator<String, Long> it = store.all();
+ while (it.hasNext()) {
+ final KeyValue<String, Long> next = it.next();
+ context.forward(next.key, next.value);
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ }
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.