You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 21:33:15 UTC
[kafka] branch 2.0 updated: KAFKA-6749: Fixed TopologyTestDriver to
process stream processing guarantee as exactly once (#4912)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new b476b61 KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once (#4912)
b476b61 is described below
commit b476b61881a542c773808eb18fb246cafc2150db
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Thu Jun 14 03:01:51 2018 +0530
KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once (#4912)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Ted Yu <yu...@gmail.com>
---
.../apache/kafka/streams/TopologyTestDriver.java | 9 +++++++++
.../kafka/streams/TopologyTestDriverTest.java | 23 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 155e54c..74fa8ca 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -196,6 +196,7 @@ public class TopologyTestDriver implements Closeable {
private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
+ private final boolean eosEnabled;
/**
* Create a new test diver instance.
@@ -346,6 +347,7 @@ public class TopologyTestDriver implements Closeable {
task = null;
context = null;
}
+ eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
}
/**
@@ -440,6 +442,10 @@ public class TopologyTestDriver implements Closeable {
// Capture all the records sent to the producer ...
final List<ProducerRecord<byte[], byte[]>> output = producer.history();
producer.clear();
+ if (eosEnabled && !producer.closed()) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ }
for (final ProducerRecord<byte[], byte[]> record : output) {
outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new LinkedList<>()).add(record);
@@ -667,6 +673,9 @@ public class TopologyTestDriver implements Closeable {
}
}
captureOutputRecords();
+ if (!eosEnabled) {
+ producer.close();
+ }
stateDirectory.clean();
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 7552637..135fb3f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -50,6 +50,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collection;
@@ -61,6 +63,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.Arrays;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -70,6 +73,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@RunWith(value = Parameterized.class)
public class TopologyTestDriverTest {
private final static String SOURCE_TOPIC_1 = "source-topic-1";
private final static String SOURCE_TOPIC_2 = "source-topic-2";
@@ -108,6 +112,23 @@ public class TopologyTestDriverTest {
new StringSerializer(),
new LongSerializer());
+ private final boolean eosEnabled;
+
+ @Parameterized.Parameters(name = "Eos enabled = {0}")
+ public static Collection<Object[]> data() {
+ final List<Object[]> values = new ArrayList<>();
+ for (final boolean eosEnabled : Arrays.asList(true, false)) {
+ values.add(new Object[] {eosEnabled});
+ }
+ return values;
+ }
+
+ public TopologyTestDriverTest(final boolean eosEnabled) {
+ this.eosEnabled = eosEnabled;
+ if (eosEnabled) {
+ config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ }
+ }
private final static class Record {
private final Object key;
@@ -353,6 +374,8 @@ public class TopologyTestDriverTest {
testDriver.close();
assertTrue(mockProcessors.get(0).closed);
+ // As testDriver is already closed, bypassing @After tearDown testDriver.close().
+ testDriver = null;
}
@Test
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.