You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 21:33:15 UTC

[kafka] branch 2.0 updated: KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once (#4912)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b476b61  KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once (#4912)
b476b61 is described below

commit b476b61881a542c773808eb18fb246cafc2150db
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Thu Jun 14 03:01:51 2018 +0530

    KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once (#4912)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Ted Yu <yu...@gmail.com>
---
 .../apache/kafka/streams/TopologyTestDriver.java   |  9 +++++++++
 .../kafka/streams/TopologyTestDriverTest.java      | 23 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 155e54c..74fa8ca 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -196,6 +196,7 @@ public class TopologyTestDriver implements Closeable {
     private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
 
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
+    private final boolean eosEnabled;
 
     /**
      * Create a new test diver instance.
@@ -346,6 +347,7 @@ public class TopologyTestDriver implements Closeable {
             task = null;
             context = null;
         }
+        eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
     }
 
     /**
@@ -440,6 +442,10 @@ public class TopologyTestDriver implements Closeable {
         // Capture all the records sent to the producer ...
         final List<ProducerRecord<byte[], byte[]>> output = producer.history();
         producer.clear();
+        if (eosEnabled && !producer.closed()) {
+            producer.initTransactions();
+            producer.beginTransaction();
+        }
         for (final ProducerRecord<byte[], byte[]> record : output) {
             outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new LinkedList<>()).add(record);
 
@@ -667,6 +673,9 @@ public class TopologyTestDriver implements Closeable {
             }
         }
         captureOutputRecords();
+        if (!eosEnabled) {
+            producer.close();
+        }
         stateDirectory.clean();
     }
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 7552637..135fb3f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -50,6 +50,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -61,6 +63,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Arrays;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -70,6 +73,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(value = Parameterized.class)
 public class TopologyTestDriverTest {
     private final static String SOURCE_TOPIC_1 = "source-topic-1";
     private final static String SOURCE_TOPIC_2 = "source-topic-2";
@@ -108,6 +112,23 @@ public class TopologyTestDriverTest {
         new StringSerializer(),
         new LongSerializer());
 
+    private final boolean eosEnabled;
+
+    @Parameterized.Parameters(name = "Eos enabled = {0}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean eosEnabled : Arrays.asList(true, false)) {
+            values.add(new Object[] {eosEnabled});
+        }
+        return values;
+    }
+
+    public TopologyTestDriverTest(final boolean eosEnabled) {
+        this.eosEnabled = eosEnabled;
+        if (eosEnabled) {
+            config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+        }
+    }
 
     private final static class Record {
         private final Object key;
@@ -353,6 +374,8 @@ public class TopologyTestDriverTest {
 
         testDriver.close();
         assertTrue(mockProcessors.get(0).closed);
+        // As testDriver is already closed, bypassing @After tearDown testDriver.close().
+        testDriver = null;
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.