You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/26 20:39:53 UTC
[3/4] kafka git commit: KAFKA-3607: Close KStreamTestDriver upon
completing; follow-up fixes to be tracked in KAFKA-3623
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 5f19b9e..1bd870e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import java.util.HashMap;
@@ -40,6 +41,16 @@ public class KStreamSelectKeyTest {
final private Serde<Integer> integerSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ private KStreamTestDriver driver;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testSelectKey() {
KStreamBuilder builder = new KStreamBuilder();
@@ -66,7 +77,7 @@ public class KStreamSelectKeyTest {
stream.selectKey(selector).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int expectedValue : expectedValues) {
driver.process(topicName, null, expectedValue);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index a0a61f2..e0bdfbc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -37,6 +38,16 @@ public class KStreamTransformTest {
final private Serde<Integer> intSerde = Serdes.Integer();
+ private KStreamTestDriver driver;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testTransform() {
KStreamBuilder builder = new KStreamBuilder();
@@ -76,7 +87,7 @@ public class KStreamTransformTest {
KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName);
stream.transform(transformerSupplier).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index f5f9698..aebcc76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -36,6 +37,16 @@ public class KStreamTransformValuesTest {
final private Serde<Integer> intSerde = Serdes.Integer();
+ private KStreamTestDriver driver;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testTransform() {
KStreamBuilder builder = new KStreamBuilder();
@@ -76,7 +87,7 @@ public class KStreamTransformValuesTest {
stream = builder.stream(intSerde, intSerde, topicName);
stream.transformValues(valueTransformerSupplier).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 3c7a1bd..828103a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -20,266 +20,257 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class KStreamWindowAggregateTest {
- final private Serde<String> strSerde = new Serdes.StringSerde();
+ final private Serde<String> strSerde = Serdes.String();
- private class StringAdd implements Aggregator<String, String, String> {
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
- @Override
- public String apply(String aggKey, String value, String aggregate) {
- return aggregate + "+" + value;
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
}
+ driver = null;
}
- private class StringInit implements Initializer<String> {
-
- @Override
- public String apply() {
- return "0";
- }
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
}
@Test
public void testAggBasic() throws Exception {
- final File baseDir = Files.createTempDirectory("test").toFile();
-
- try {
- final KStreamBuilder builder = new KStreamBuilder();
- String topic1 = "topic1";
-
- KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
- KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
- HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
- strSerde,
- strSerde);
-
- MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-
- driver.setTime(0L);
- driver.process(topic1, "A", "1");
- driver.setTime(1L);
- driver.process(topic1, "B", "2");
- driver.setTime(2L);
- driver.process(topic1, "C", "3");
- driver.setTime(3L);
- driver.process(topic1, "D", "4");
- driver.setTime(4L);
- driver.process(topic1, "A", "1");
-
- driver.setTime(5L);
- driver.process(topic1, "A", "1");
- driver.setTime(6L);
- driver.process(topic1, "B", "2");
- driver.setTime(7L);
- driver.process(topic1, "D", "4");
- driver.setTime(8L);
- driver.process(topic1, "B", "2");
- driver.setTime(9L);
- driver.process(topic1, "C", "3");
-
- driver.setTime(10L);
- driver.process(topic1, "A", "1");
- driver.setTime(11L);
- driver.process(topic1, "B", "2");
- driver.setTime(12L);
- driver.process(topic1, "D", "4");
- driver.setTime(13L);
- driver.process(topic1, "B", "2");
- driver.setTime(14L);
- driver.process(topic1, "C", "3");
-
- assertEquals(Utils.mkList(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1",
-
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3",
-
- "[A@5]:0+1+1", "[A@10]:0+1",
- "[B@5]:0+2+2+2", "[B@10]:0+2",
- "[D@5]:0+4+4", "[D@10]:0+4",
- "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
- "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
-
- } finally {
- Utils.delete(baseDir);
- }
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+ KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+ HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+ strSerde,
+ strSerde);
+
+ MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ driver = new KStreamTestDriver(builder, stateDir);
+
+ driver.setTime(0L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(1L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(2L);
+ driver.process(topic1, "C", "3");
+ driver.setTime(3L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(4L);
+ driver.process(topic1, "A", "1");
+
+ driver.setTime(5L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(6L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(7L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(8L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(9L);
+ driver.process(topic1, "C", "3");
+
+ driver.setTime(10L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(11L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(12L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(13L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(14L);
+ driver.process(topic1, "C", "3");
+
+ assertEquals(Utils.mkList(
+ "[A@0]:0+1",
+ "[B@0]:0+2",
+ "[C@0]:0+3",
+ "[D@0]:0+4",
+ "[A@0]:0+1+1",
+
+ "[A@0]:0+1+1+1", "[A@5]:0+1",
+ "[B@0]:0+2+2", "[B@5]:0+2",
+ "[D@0]:0+4+4", "[D@5]:0+4",
+ "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+ "[C@0]:0+3+3", "[C@5]:0+3",
+
+ "[A@5]:0+1+1", "[A@10]:0+1",
+ "[B@5]:0+2+2+2", "[B@10]:0+2",
+ "[D@5]:0+4+4", "[D@10]:0+4",
+ "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+ "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
}
@Test
public void testJoin() throws Exception {
- final File baseDir = Files.createTempDirectory("test").toFile();
-
- try {
- final KStreamBuilder builder = new KStreamBuilder();
- String topic1 = "topic1";
- String topic2 = "topic2";
-
- KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
- KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
- HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
- strSerde,
- strSerde);
-
- MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
- table1.toStream().process(proc1);
-
- KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
- KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(),
- HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
- strSerde,
- strSerde);
-
- MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
-
-
- MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
- table1.join(table2, new ValueJoiner<String, String, String>() {
- @Override
- public String apply(String p1, String p2) {
- return p1 + "%" + p2;
- }
- }).toStream().process(proc3);
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-
- driver.setTime(0L);
- driver.process(topic1, "A", "1");
- driver.setTime(1L);
- driver.process(topic1, "B", "2");
- driver.setTime(2L);
- driver.process(topic1, "C", "3");
- driver.setTime(3L);
- driver.process(topic1, "D", "4");
- driver.setTime(4L);
- driver.process(topic1, "A", "1");
-
- proc1.checkAndClearProcessResult(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1"
- );
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult(
- "[A@0]:null",
- "[B@0]:null",
- "[C@0]:null",
- "[D@0]:null",
- "[A@0]:null"
- );
-
- driver.setTime(5L);
- driver.process(topic1, "A", "1");
- driver.setTime(6L);
- driver.process(topic1, "B", "2");
- driver.setTime(7L);
- driver.process(topic1, "D", "4");
- driver.setTime(8L);
- driver.process(topic1, "B", "2");
- driver.setTime(9L);
- driver.process(topic1, "C", "3");
-
- proc1.checkAndClearProcessResult(
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3"
- );
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult(
- "[A@0]:null", "[A@5]:null",
- "[B@0]:null", "[B@5]:null",
- "[D@0]:null", "[D@5]:null",
- "[B@0]:null", "[B@5]:null",
- "[C@0]:null", "[C@5]:null"
- );
-
- driver.setTime(0L);
- driver.process(topic2, "A", "a");
- driver.setTime(1L);
- driver.process(topic2, "B", "b");
- driver.setTime(2L);
- driver.process(topic2, "C", "c");
- driver.setTime(3L);
- driver.process(topic2, "D", "d");
- driver.setTime(4L);
- driver.process(topic2, "A", "a");
-
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
- "[A@0]:0+a",
- "[B@0]:0+b",
- "[C@0]:0+c",
- "[D@0]:0+d",
- "[A@0]:0+a+a"
- );
- proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a",
- "[B@0]:0+2+2+2%0+b",
- "[C@0]:0+3+3%0+c",
- "[D@0]:0+4+4%0+d",
- "[A@0]:0+1+1+1%0+a+a");
-
- driver.setTime(5L);
- driver.process(topic2, "A", "a");
- driver.setTime(6L);
- driver.process(topic2, "B", "b");
- driver.setTime(7L);
- driver.process(topic2, "D", "d");
- driver.setTime(8L);
- driver.process(topic2, "B", "b");
- driver.setTime(9L);
- driver.process(topic2, "C", "c");
-
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
- "[A@0]:0+a+a+a", "[A@5]:0+a",
- "[B@0]:0+b+b", "[B@5]:0+b",
- "[D@0]:0+d+d", "[D@5]:0+d",
- "[B@0]:0+b+b+b", "[B@5]:0+b+b",
- "[C@0]:0+c+c", "[C@5]:0+c"
- );
- proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
- "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
- "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
- "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
- "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
- );
-
- } finally {
- Utils.delete(baseDir);
- }
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+ KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+ HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+ strSerde,
+ strSerde);
+
+ MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+ table1.toStream().process(proc1);
+
+ KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
+ KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+ HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
+ strSerde,
+ strSerde);
+
+ MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+
+ MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+ table1.join(table2, new ValueJoiner<String, String, String>() {
+ @Override
+ public String apply(String p1, String p2) {
+ return p1 + "%" + p2;
+ }
+ }).toStream().process(proc3);
+
+ driver = new KStreamTestDriver(builder, stateDir);
+
+ driver.setTime(0L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(1L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(2L);
+ driver.process(topic1, "C", "3");
+ driver.setTime(3L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(4L);
+ driver.process(topic1, "A", "1");
+
+ proc1.checkAndClearProcessResult(
+ "[A@0]:0+1",
+ "[B@0]:0+2",
+ "[C@0]:0+3",
+ "[D@0]:0+4",
+ "[A@0]:0+1+1"
+ );
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult(
+ "[A@0]:null",
+ "[B@0]:null",
+ "[C@0]:null",
+ "[D@0]:null",
+ "[A@0]:null"
+ );
+
+ driver.setTime(5L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(6L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(7L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(8L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(9L);
+ driver.process(topic1, "C", "3");
+
+ proc1.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1", "[A@5]:0+1",
+ "[B@0]:0+2+2", "[B@5]:0+2",
+ "[D@0]:0+4+4", "[D@5]:0+4",
+ "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+ "[C@0]:0+3+3", "[C@5]:0+3"
+ );
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult(
+ "[A@0]:null", "[A@5]:null",
+ "[B@0]:null", "[B@5]:null",
+ "[D@0]:null", "[D@5]:null",
+ "[B@0]:null", "[B@5]:null",
+ "[C@0]:null", "[C@5]:null"
+ );
+
+ driver.setTime(0L);
+ driver.process(topic2, "A", "a");
+ driver.setTime(1L);
+ driver.process(topic2, "B", "b");
+ driver.setTime(2L);
+ driver.process(topic2, "C", "c");
+ driver.setTime(3L);
+ driver.process(topic2, "D", "d");
+ driver.setTime(4L);
+ driver.process(topic2, "A", "a");
+
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
+ "[A@0]:0+a",
+ "[B@0]:0+b",
+ "[C@0]:0+c",
+ "[D@0]:0+d",
+ "[A@0]:0+a+a"
+ );
+ proc3.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1%0+a",
+ "[B@0]:0+2+2+2%0+b",
+ "[C@0]:0+3+3%0+c",
+ "[D@0]:0+4+4%0+d",
+ "[A@0]:0+1+1+1%0+a+a");
+
+ driver.setTime(5L);
+ driver.process(topic2, "A", "a");
+ driver.setTime(6L);
+ driver.process(topic2, "B", "b");
+ driver.setTime(7L);
+ driver.process(topic2, "D", "d");
+ driver.setTime(8L);
+ driver.process(topic2, "B", "b");
+ driver.setTime(9L);
+ driver.process(topic2, "C", "c");
+
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
+ "[A@0]:0+a+a+a", "[A@5]:0+a",
+ "[B@0]:0+b+b", "[B@5]:0+b",
+ "[D@0]:0+d+d", "[D@5]:0+d",
+ "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+ "[C@0]:0+c+c", "[C@5]:0+c"
+ );
+ proc3.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+ "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+ "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+ "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+ "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+ );
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index be0ec19..a614479 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -27,61 +27,73 @@ import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class KTableAggregateTest {
- final private Serde<String> stringSerde = new Serdes.StringSerde();
+ final private Serde<String> stringSerde = Serdes.String();
- @Test
- public void testAggBasic() throws Exception {
- final File baseDir = Files.createTempDirectory("test").toFile();
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
- try {
- final KStreamBuilder builder = new KStreamBuilder();
- String topic1 = "topic1";
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
- KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
- KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
- stringSerde,
- stringSerde
- ).aggregate(MockInitializer.STRING_INIT,
- MockAggregator.STRING_ADDER,
- MockAggregator.STRING_REMOVER,
- stringSerde,
- "topic1-Canonized");
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
- MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
+ @Test
+ public void testAggBasic() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ String topic1 = "topic1";
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+ KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+ KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
+ stringSerde,
+ stringSerde
+ ).aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.STRING_ADDER,
+ MockAggregator.STRING_REMOVER,
+ stringSerde,
+ "topic1-Canonized");
- driver.process(topic1, "A", "1");
- driver.process(topic1, "B", "2");
- driver.process(topic1, "A", "3");
- driver.process(topic1, "B", "4");
- driver.process(topic1, "C", "5");
- driver.process(topic1, "D", "6");
- driver.process(topic1, "B", "7");
- driver.process(topic1, "C", "8");
+ MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
- assertEquals(Utils.mkList(
- "A:0+1",
- "B:0+2",
- "A:0+1+3", "A:0+1+3-1",
- "B:0+2+4", "B:0+2+4-2",
- "C:0+5",
- "D:0+6",
- "B:0+2+4-2+7", "B:0+2+4-2+7-4",
- "C:0+5+8", "C:0+5+8-5"), proc2.processed);
+ driver = new KStreamTestDriver(builder, stateDir);
- } finally {
- Utils.delete(baseDir);
- }
+ driver.process(topic1, "A", "1");
+ driver.process(topic1, "B", "2");
+ driver.process(topic1, "A", "3");
+ driver.process(topic1, "B", "4");
+ driver.process(topic1, "C", "5");
+ driver.process(topic1, "D", "6");
+ driver.process(topic1, "B", "7");
+ driver.process(topic1, "C", "8");
+
+ assertEquals(Utils.mkList(
+ "A:0+1",
+ "B:0+2",
+ "A:0+1+3", "A:0+1+3-1",
+ "B:0+2+4", "B:0+2+4-2",
+ "C:0+5",
+ "D:0+6",
+ "B:0+2+4-2+7", "B:0+2+4-2+7-4",
+ "C:0+5+8", "C:0+5+8-5"), proc2.processed);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index ee26058..a3af133 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -19,25 +19,42 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class KTableFilterTest {
- final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
- final private Serde<String> stringSerde = new Serdes.StringSerde();
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
+
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
@Test
public void testKTable() {
@@ -65,7 +82,7 @@ public class KTableFilterTest {
table2.toStream().process(proc2);
table3.toStream().process(proc3);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
driver.process(topic1, "A", 1);
driver.process(topic1, "B", 2);
@@ -80,199 +97,181 @@ public class KTableFilterTest {
@Test
public void testValueGetter() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
-
- KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
- KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
-
- KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-
- KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
- KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-
- getter2.init(driver.context());
- getter3.init(driver.context());
-
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 1);
- driver.process(topic1, "C", 1);
-
- assertNull(getter2.get("A"));
- assertNull(getter2.get("B"));
- assertNull(getter2.get("C"));
-
- assertEquals(1, (int) getter3.get("A"));
- assertEquals(1, (int) getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
-
- driver.process(topic1, "A", 2);
- driver.process(topic1, "B", 2);
-
- assertEquals(2, (int) getter2.get("A"));
- assertEquals(2, (int) getter2.get("B"));
- assertNull(getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
-
- driver.process(topic1, "A", 3);
-
- assertNull(getter2.get("A"));
- assertEquals(2, (int) getter2.get("B"));
- assertNull(getter2.get("C"));
-
- assertEquals(3, (int) getter3.get("A"));
- assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
-
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
-
- assertNull(getter2.get("A"));
- assertNull(getter2.get("B"));
- assertNull(getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
-
- } finally {
- Utils.delete(stateDir);
- }
+ KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTableImpl<String, Integer, Integer> table1 =
+ (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+ KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
+
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+
+ getter2.init(driver.context());
+ getter3.init(driver.context());
+
+ driver.process(topic1, "A", 1);
+ driver.process(topic1, "B", 1);
+ driver.process(topic1, "C", 1);
+
+ assertNull(getter2.get("A"));
+ assertNull(getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ assertEquals(1, (int) getter3.get("A"));
+ assertEquals(1, (int) getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
+
+ driver.process(topic1, "A", 2);
+ driver.process(topic1, "B", 2);
+
+ assertEquals(2, (int) getter2.get("A"));
+ assertEquals(2, (int) getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
+
+ driver.process(topic1, "A", 3);
+
+ assertNull(getter2.get("A"));
+ assertEquals(2, (int) getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ assertEquals(3, (int) getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
+
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
+
+ assertNull(getter2.get("A"));
+ assertNull(getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
}
@Test
public void testNotSendingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
-
- KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
- KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
+ KStreamBuilder builder = new KStreamBuilder();
- MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ String topic1 = "topic1";
- builder.addProcessor("proc1", proc1, table1.name);
- builder.addProcessor("proc2", proc2, table2.name);
+ KTableImpl<String, Integer, Integer> table1 =
+ (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+ KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+ MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+ builder.addProcessor("proc1", proc1, table1.name);
+ builder.addProcessor("proc2", proc2, table2.name);
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 1);
- driver.process(topic1, "C", 1);
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
- proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+ driver.process(topic1, "A", 1);
+ driver.process(topic1, "B", 1);
+ driver.process(topic1, "C", 1);
- driver.process(topic1, "A", 2);
- driver.process(topic1, "B", 2);
+ proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
- proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
- proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ driver.process(topic1, "A", 2);
+ driver.process(topic1, "B", 2);
- driver.process(topic1, "A", 3);
+ proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
- proc1.checkAndClearProcessResult("A:(3<-null)");
- proc2.checkAndClearProcessResult("A:(null<-null)");
+ driver.process(topic1, "A", 3);
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
+ proc1.checkAndClearProcessResult("A:(3<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)");
- proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
- proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
- } finally {
- Utils.delete(stateDir);
- }
+ proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
}
@Test
public void testSendingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
+ KStreamBuilder builder = new KStreamBuilder();
- KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
- KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
+ String topic1 = "topic1";
- table2.enableSendingOldValues();
+ KTableImpl<String, Integer, Integer> table1 =
+ (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+ KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
- MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ table2.enableSendingOldValues();
- builder.addProcessor("proc1", proc1, table1.name);
- builder.addProcessor("proc2", proc2, table2.name);
+ MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+ MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+ builder.addProcessor("proc1", proc1, table1.name);
+ builder.addProcessor("proc2", proc2, table2.name);
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 1);
- driver.process(topic1, "C", 1);
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
- proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+ driver.process(topic1, "A", 1);
+ driver.process(topic1, "B", 1);
+ driver.process(topic1, "C", 1);
- driver.process(topic1, "A", 2);
- driver.process(topic1, "B", 2);
+ proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
- proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
- proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ driver.process(topic1, "A", 2);
+ driver.process(topic1, "B", 2);
- driver.process(topic1, "A", 3);
+ proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+ proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
- proc1.checkAndClearProcessResult("A:(3<-2)");
- proc2.checkAndClearProcessResult("A:(null<-2)");
+ driver.process(topic1, "A", 3);
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
+ proc1.checkAndClearProcessResult("A:(3<-2)");
+ proc2.checkAndClearProcessResult("A:(null<-2)");
- proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
- proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
- } finally {
- Utils.delete(stateDir);
- }
+ proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 27a5114..af131c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
import org.junit.Test;
import java.util.List;
import java.util.Locale;
@@ -39,6 +40,16 @@ public class KTableForeachTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ private KStreamTestDriver driver;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testForeach() {
// Given
@@ -71,7 +82,7 @@ public class KTableForeachTest {
table.foreach(action);
// Then
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (KeyValue<Integer, String> record: inputRecords) {
driver.process(topicName, record.key, record.value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 8a13e9a..ca3bbe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -33,11 +33,13 @@ import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -46,7 +48,23 @@ import static org.junit.Assert.assertTrue;
public class KTableImplTest {
- final private Serde<String> stringSerde = new Serdes.StringSerde();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
+
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
@Test
public void testKTable() {
@@ -85,7 +103,7 @@ public class KTableImplTest {
MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
table4.toStream().process(proc4);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
driver.process(topic1, "A", "01");
driver.process(topic1, "B", "02");
@@ -100,129 +118,157 @@ public class KTableImplTest {
@Test
public void testValueGetter() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
- String topic2 = "topic2";
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
- KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(stringSerde, stringSerde, topic2);
-
- KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
- KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-
- // two state store should be created
- assertEquals(2, driver.allStateStores().size());
-
- KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(driver.context());
- KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
- getter2.init(driver.context());
- KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- getter3.init(driver.context());
- KTableValueGetter<String, String> getter4 = getterSupplier4.get();
- getter4.init(driver.context());
-
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
-
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(1), getter2.get("A"));
- assertEquals(new Integer(1), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("01", getter4.get("A"));
- assertEquals("01", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
-
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(2), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertEquals(new Integer(2), getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("02", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "03");
-
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(3), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("03", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", null);
-
- assertNull(getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertNull(getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertNull(getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
+ final KStreamBuilder builder = new KStreamBuilder();
- } finally {
- Utils.delete(stateDir);
- }
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+ table1.through(stringSerde, stringSerde, topic2);
+
+ KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+ KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
+
+ // two state store should be created
+ assertEquals(2, driver.allStateStores().size());
+
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.context());
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ getter2.init(driver.context());
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ getter3.init(driver.context());
+ KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+ getter4.init(driver.context());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(1), getter2.get("A"));
+ assertEquals(new Integer(1), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("01", getter4.get("A"));
+ assertEquals("01", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertEquals(new Integer(2), getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("02", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "03");
+
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(3), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("03", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", null);
+
+ assertNull(getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertNull(getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+ }
+
+ @Test
+ public void testStateStoreLazyEval() throws IOException {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, String> table2 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
+
+ KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
+ driver.setTime(0L);
+
+ // no state store should be created
+ assertEquals(0, driver.allStateStores().size());
}
@Test
@@ -230,120 +276,75 @@ public class KTableImplTest {
String topic1 = "topic1";
String topic2 = "topic2";
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- KStreamBuilder builder = new KStreamBuilder();
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- KTableImpl<String, String, String> table2 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
-
- KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
- KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
- driver.setTime(0L);
-
- // no state store should be created
- assertEquals(0, driver.allStateStores().size());
-
- } finally {
- Utils.delete(stateDir);
- }
+ final KStreamBuilder builder = new KStreamBuilder();
- try {
- KStreamBuilder builder = new KStreamBuilder();
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- KTableImpl<String, String, String> table2 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
-
- KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
- KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
- table2.join(table1MappedFiltered,
- new ValueJoiner<String, Integer, String>() {
- @Override
- public String apply(String v1, Integer v2) {
- return v1 + v2;
- }
- });
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
- driver.setTime(0L);
-
- // two state store should be created
- assertEquals(2, driver.allStateStores().size());
-
- } finally {
- Utils.delete(stateDir);
- }
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, String> table2 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
+
+ KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ table2.join(table1MappedFiltered,
+ new ValueJoiner<String, Integer, String>() {
+ @Override
+ public String apply(String v1, Integer v2) {
+ return v1 + v2;
+ }
+ });
+
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
+ driver.setTime(0L);
+
+ // two state store should be created
+ assertEquals(2, driver.allStateStores().size());
}
@Test
public void testRepartition() throws IOException {
String topic1 = "topic1";
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- KStreamBuilder builder = new KStreamBuilder();
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ final KStreamBuilder builder = new KStreamBuilder();
- KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
- .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
- .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1");
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
+ .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1");
- KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
- .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
- .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
- driver.setTime(0L);
+ KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
+ .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+ .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
- // three state store should be created, one for source, one for aggregate and one for reduce
- assertEquals(3, driver.allStateStores().size());
+ driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
+ driver.setTime(0L);
- // contains the corresponding repartition source / sink nodes
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
+ // three state store should be created, one for source, one for aggregate and one for reduce
+ assertEquals(3, driver.allStateStores().size());
- assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
- assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
- assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
- assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+ // contains the corresponding repartition source / sink nodes
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
- } finally {
- Utils.delete(stateDir);
- }
+ assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
+ assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
+ assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
+ assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
}
}