You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:29 UTC
[15/50] [abbrv] kafka git commit: KAFKA-3607: Close KStreamTestDriver
upon completing; follow-up fixes to be tracked in KAFKA-3623
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 9cafe8b..efb17fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -26,11 +26,13 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -39,7 +41,23 @@ import static org.junit.Assert.assertTrue;
public class KTableMapValuesTest {
- final private Serde<String> stringSerde = new Serdes.StringSerde();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
+
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
@Test
public void testKTable() {
@@ -58,7 +76,7 @@ public class KTableMapValuesTest {
MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
driver.process(topic1, "A", "01");
driver.process(topic1, "B", "02");
@@ -70,230 +88,211 @@ public class KTableMapValuesTest {
@Test
public void testValueGetter() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
- String topic2 = "topic2";
-
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- });
- KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(stringSerde, stringSerde, topic2);
-
- KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
- KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
- KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-
- KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(driver.context());
- KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
- getter2.init(driver.context());
- KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- getter3.init(driver.context());
- KTableValueGetter<String, String> getter4 = getterSupplier4.get();
- getter4.init(driver.context());
-
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
-
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(1), getter2.get("A"));
- assertEquals(new Integer(1), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("01", getter4.get("A"));
- assertEquals("01", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
-
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(2), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertEquals(new Integer(2), getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("02", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "03");
-
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(3), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("03", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", null);
-
- assertNull(getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertNull(getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertNull(getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- } finally {
- Utils.delete(stateDir);
- }
+ KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+ table1.through(stringSerde, stringSerde, topic2);
+
+ KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+ KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
+
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.context());
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ getter2.init(driver.context());
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ getter3.init(driver.context());
+ KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+ getter4.init(driver.context());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(1), getter2.get("A"));
+ assertEquals(new Integer(1), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("01", getter4.get("A"));
+ assertEquals("01", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertEquals(new Integer(2), getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("02", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "03");
+
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(3), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("03", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", null);
+
+ assertNull(getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertNull(getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
}
@Test
public void testNotSendingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
+ KStreamBuilder builder = new KStreamBuilder();
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
+ String topic1 = "topic1";
- MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
- builder.addProcessor("proc", proc, table2.name);
+ MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+ builder.addProcessor("proc", proc, table2.name);
- assertFalse(table1.sendingOldValueEnabled());
- assertFalse(table2.sendingOldValueEnabled());
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
+ assertFalse(table1.sendingOldValueEnabled());
+ assertFalse(table2.sendingOldValueEnabled());
- proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
+ proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
- driver.process(topic1, "A", "03");
+ proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
- proc.checkAndClearProcessResult("A:(3<-null)");
+ driver.process(topic1, "A", "03");
- driver.process(topic1, "A", null);
+ proc.checkAndClearProcessResult("A:(3<-null)");
- proc.checkAndClearProcessResult("A:(null<-null)");
+ driver.process(topic1, "A", null);
- } finally {
- Utils.delete(stateDir);
- }
+ proc.checkAndClearProcessResult("A:(null<-null)");
}
@Test
public void testSendingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
+ KStreamBuilder builder = new KStreamBuilder();
- KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
- new ValueMapper<String, Integer>() {
- @Override
- public Integer apply(String value) {
- return new Integer(value);
- }
- });
+ String topic1 = "topic1";
- table2.enableSendingOldValues();
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
- MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+ table2.enableSendingOldValues();
- builder.addProcessor("proc", proc, table2.name);
+ MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+ builder.addProcessor("proc", proc, table2.name);
- assertTrue(table1.sendingOldValueEnabled());
- assertTrue(table2.sendingOldValueEnabled());
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
+ assertTrue(table1.sendingOldValueEnabled());
+ assertTrue(table2.sendingOldValueEnabled());
- proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
+ proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
- driver.process(topic1, "A", "03");
+ proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
- proc.checkAndClearProcessResult("A:(3<-2)");
+ driver.process(topic1, "A", "03");
- driver.process(topic1, "A", null);
+ proc.checkAndClearProcessResult("A:(3<-2)");
- proc.checkAndClearProcessResult("A:(null<-3)");
+ driver.process(topic1, "A", null);
- } finally {
- Utils.delete(stateDir);
- }
+ proc.checkAndClearProcessResult("A:(null<-3)");
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 7c158e2..aaa6cc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -24,11 +24,13 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -36,7 +38,23 @@ import static org.junit.Assert.assertTrue;
public class KTableSourceTest {
- final private Serde<String> stringSerde = new Serdes.StringSerde();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
+
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
@Test
public void testKTable() {
@@ -49,7 +67,7 @@ public class KTableSourceTest {
MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
driver.process(topic1, "A", 1);
driver.process(topic1, "B", 2);
@@ -63,138 +81,120 @@ public class KTableSourceTest {
@Test
public void testValueGetter() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
+ final KStreamBuilder builder = new KStreamBuilder();
- String topic1 = "topic1";
+ String topic1 = "topic1";
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
- KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(driver.context());
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.context());
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- driver.process(topic1, "A", "03");
+ driver.process(topic1, "A", "03");
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
- assertNull(getter1.get("A"));
- assertNull(getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertNull(getter1.get("A"));
+ assertNull(getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- } finally {
- Utils.delete(stateDir);
- }
}
@Test
public void testNotSedingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
+ final KStreamBuilder builder = new KStreamBuilder();
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ String topic1 = "topic1";
- MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- builder.addProcessor("proc1", proc1, table1.name);
+ MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+ builder.addProcessor("proc1", proc1, table1.name);
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
- proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
+ proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
- proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
- driver.process(topic1, "A", "03");
+ proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
- proc1.checkAndClearProcessResult("A:(03<-null)");
+ driver.process(topic1, "A", "03");
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
+ proc1.checkAndClearProcessResult("A:(03<-null)");
- proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
- } finally {
- Utils.delete(stateDir);
- }
+ proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
}
@Test
public void testSedingOldValue() throws IOException {
- File stateDir = Files.createTempDirectory("test").toFile();
- try {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
+ final KStreamBuilder builder = new KStreamBuilder();
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ String topic1 = "topic1";
- table1.enableSendingOldValues();
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
- assertTrue(table1.sendingOldValueEnabled());
+ table1.enableSendingOldValues();
- MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+ assertTrue(table1.sendingOldValueEnabled());
- builder.addProcessor("proc1", proc1, table1.name);
+ MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+ builder.addProcessor("proc1", proc1, table1.name);
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
+ driver = new KStreamTestDriver(builder, stateDir, null, null);
- proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
+ proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
- proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
- driver.process(topic1, "A", "03");
+ proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
- proc1.checkAndClearProcessResult("A:(03<-02)");
+ driver.process(topic1, "A", "03");
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
+ proc1.checkAndClearProcessResult("A:(03<-02)");
- proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
- } finally {
- Utils.delete(stateDir);
- }
+ proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
index 22948ab..c8707af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
@@ -39,11 +40,19 @@ public class KeyValuePrinterProcessorTest {
private String topicName = "topic";
private Serde<String> stringSerde = Serdes.String();
- private Serde<byte[]> bytesSerde = Serdes.ByteArray();
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
private KStreamBuilder builder = new KStreamBuilder();
private PrintStream printStream = new PrintStream(baos);
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testPrintKeyValueDefaultSerde() throws Exception {
@@ -57,7 +66,7 @@ public class KeyValuePrinterProcessorTest {
KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
stream.process(keyValuePrinter);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < suppliedKeys.length; i++) {
driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
}
@@ -79,7 +88,7 @@ public class KeyValuePrinterProcessorTest {
stream.process(keyValuePrinter);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
String suppliedKey = null;
byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index d738794..7316804 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -82,6 +83,12 @@ public class KStreamTestDriver {
public void process(String topicName, Object key, Object value) {
currNode = topology.source(topicName);
+
+ // if currNode is null, check if this topic is a changelog topic;
+ // if yes, skip
+ if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))
+ return;
+
try {
forward(key, value);
} finally {
@@ -108,10 +115,6 @@ public class KStreamTestDriver {
context.setTime(timestamp);
}
- public StateStore getStateStore(String name) {
- return context.getStateStore(name);
- }
-
@SuppressWarnings("unchecked")
public <K, V> void forward(K key, V value) {
ProcessorNode thisNode = currNode;
@@ -153,6 +156,23 @@ public class KStreamTestDriver {
}
}
+ public void close() {
+ // close all processors
+ for (ProcessorNode node : topology.processors()) {
+ currNode = node;
+ try {
+ node.close();
+ } finally {
+ currNode = null;
+ }
+ }
+
+ // close all state stores
+ for (StateStore store : context.allStateStores().values()) {
+ store.close();
+ }
+ }
+
public Set<String> allProcessorNames() {
Set<String> names = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
index ae8c2fd..769ee71 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
@@ -26,7 +26,7 @@ public class MockKeyValueMapper {
@Override
public KeyValue<K, V> apply(K key, V value) {
- return new KeyValue<>(key, value);
+ return KeyValue.pair(key, value);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
new file mode 100644
index 0000000..4d44166
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+
+public class MockValueJoiner {
+
+ private static class StringJoin implements ValueJoiner<String, String, String> {
+
+ @Override
+ public String apply(String value1, String value2) {
+ return value1 + "+" + value2;
+ }
+ };
+
+ public final static ValueJoiner<String, String, String> STRING_JOINER = new StringJoin();
+}
\ No newline at end of file