You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:32 UTC
[18/50] [abbrv] kafka git commit: KAFKA-3607: Close KStreamTestDriver
upon completing; follow-up fixes to be tracked in KAFKA-3623
KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Eno Thereska, Michael G. Noll, Ismael Juma
Closes #1258 from guozhangwang/K3607
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a73629b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a73629b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a73629b
Branch: refs/heads/0.10.0
Commit: 1a73629bb43bbc781e5a968a61f6079365bc75b7
Parents: f60a3fa
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Apr 26 11:39:49 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 26 11:39:49 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/test/TestUtils.java | 37 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 9 +-
.../streams/kstream/KStreamBuilderTest.java | 13 +-
.../internals/KGroupedTableImplTest.java | 11 +-
.../kstream/internals/KStreamBranchTest.java | 13 +-
.../kstream/internals/KStreamFilterTest.java | 15 +-
.../kstream/internals/KStreamFlatMapTest.java | 13 +-
.../internals/KStreamFlatMapValuesTest.java | 13 +-
.../kstream/internals/KStreamForeachTest.java | 13 +-
.../internals/KStreamKStreamJoinTest.java | 651 +++++++++----------
.../internals/KStreamKStreamLeftJoinTest.java | 341 +++++-----
.../internals/KStreamKTableLeftJoinTest.java | 153 +++--
.../kstream/internals/KStreamMapTest.java | 13 +-
.../kstream/internals/KStreamMapValuesTest.java | 13 +-
.../kstream/internals/KStreamSelectKeyTest.java | 13 +-
.../kstream/internals/KStreamTransformTest.java | 13 +-
.../internals/KStreamTransformValuesTest.java | 13 +-
.../internals/KStreamWindowAggregateTest.java | 455 +++++++------
.../kstream/internals/KTableAggregateTest.java | 92 +--
.../kstream/internals/KTableFilterTest.java | 327 +++++-----
.../kstream/internals/KTableForeachTest.java | 13 +-
.../kstream/internals/KTableImplTest.java | 451 ++++++-------
.../kstream/internals/KTableKTableJoinTest.java | 394 ++++++-----
.../internals/KTableKTableLeftJoinTest.java | 397 ++++++-----
.../internals/KTableKTableOuterJoinTest.java | 426 ++++++------
.../kstream/internals/KTableMapKeysTest.java | 13 +-
.../kstream/internals/KTableMapValuesTest.java | 381 ++++++-----
.../kstream/internals/KTableSourceTest.java | 172 ++---
.../internals/KeyValuePrinterProcessorTest.java | 15 +-
.../apache/kafka/test/KStreamTestDriver.java | 28 +-
.../apache/kafka/test/MockKeyValueMapper.java | 2 +-
.../org/apache/kafka/test/MockValueJoiner.java | 33 +
32 files changed, 2349 insertions(+), 2197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 027221e..1bfe578 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -20,6 +20,8 @@ import static java.util.Arrays.asList;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -29,6 +31,7 @@ import java.util.Random;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
/**
@@ -97,12 +100,44 @@ public class TestUtils {
}
/**
- * Creates an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the
+ * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the
* suffix to generate its name.
*/
public static File tempFile() throws IOException {
File file = File.createTempFile("kafka", ".tmp");
file.deleteOnExit();
+
+ return file;
+ }
+
+ /**
+ * Create a temporary relative directory in the default temporary-file directory with the given prefix.
+ *
+ * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
+ */
+ public static File tempDirectory(String prefix) throws IOException {
+ return tempDirectory(null, prefix);
+ }
+
+ /**
+ * Create a temporary relative directory in the specified parent directory with the given prefix.
+ *
+ * @param parent The parent folder path name, if null using the default temporary-file directory
+ * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
+ */
+ public static File tempDirectory(Path parent, String prefix) throws IOException {
+ final File file = parent == null ?
+ Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() :
+ Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile();
+ file.deleteOnExit();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ Utils.delete(file);
+ }
+ });
+
return file;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index be7741d..6bd6c63 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -95,15 +95,8 @@ object TestUtils extends Logging {
def tempRelativeDir(parent: String): File = {
val parentFile = new File(parent)
parentFile.mkdirs()
- val f = Files.createTempDirectory(parentFile.toPath, "kafka-").toFile
- f.deleteOnExit()
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run() = {
- Utils.delete(f)
- }
- })
- f
+ org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-");
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index e75b595..cdf28db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -22,12 +22,23 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class KStreamBuilderTest {
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test(expected = TopologyBuilderException.class)
public void testFrom() {
final KStreamBuilder builder = new KStreamBuilder();
@@ -66,7 +77,7 @@ public class KStreamBuilderTest {
MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
driver.setTime(0L);
driver.process(topic1, "A", "aa");
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 9eeea20..fc0451a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -18,19 +18,17 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
@@ -43,12 +41,7 @@ public class KGroupedTableImplTest {
@Before
public void setUp() throws IOException {
- stateDir = Files.createTempDirectory("test").toFile();
- }
-
- @After
- public void tearDown() throws IOException {
- Utils.delete(stateDir);
+ stateDir = TestUtils.tempDirectory("kafka-test");
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index e04a273..0650b95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import java.lang.reflect.Array;
@@ -33,6 +34,16 @@ public class KStreamBranchTest {
private String topicName = "topic";
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testKStreamBranch() {
@@ -74,7 +85,7 @@ public class KStreamBranchTest {
branches[i].process(processors[i]);
}
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 75465c8..4be8513 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -32,6 +33,16 @@ public class KStreamFilterTest {
private String topicName = "topic";
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
@Override
public boolean test(Integer key, String value) {
@@ -51,7 +62,7 @@ public class KStreamFilterTest {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.filter(isMultipleOfThree).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
@@ -71,7 +82,7 @@ public class KStreamFilterTest {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.filterNot(isMultipleOfThree).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index bc85757..da57d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -34,6 +35,16 @@ public class KStreamFlatMapTest {
private String topicName = "topic";
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testFlatMap() {
KStreamBuilder builder = new KStreamBuilder();
@@ -59,7 +70,7 @@ public class KStreamFlatMapTest {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.flatMap(mapper).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 63f5636..9d1141b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import java.util.ArrayList;
@@ -34,6 +35,16 @@ public class KStreamFlatMapValuesTest {
private String topicName = "topic";
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testFlatMapValues() {
KStreamBuilder builder = new KStreamBuilder();
@@ -58,7 +69,7 @@ public class KStreamFlatMapValuesTest {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.flatMapValues(mapper).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index d0a182d..0bc5e77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
import org.junit.Test;
import java.util.List;
import java.util.Locale;
@@ -39,6 +40,16 @@ public class KStreamForeachTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testForeach() {
// Given
@@ -71,7 +82,7 @@ public class KStreamForeachTest {
stream.foreach(action);
// Then
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (KeyValue<Integer, String> record: inputRecords) {
driver.process(topicName, record.key, record.value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 19a9411..6b0828a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@@ -39,460 +41,447 @@ import static org.junit.Assert.assertEquals;
public class KStreamKStreamJoinTest {
- private String topic1 = "topic1";
- private String topic2 = "topic2";
+ final private String topic1 = "topic1";
+ final private String topic2 = "topic2";
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
- @Override
- public String apply(String value1, String value2) {
- return value1 + "+" + value2;
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
+
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
}
- };
+ driver = null;
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
@Test
public void testJoin() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
-
- KStreamBuilder builder = new KStreamBuilder();
-
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
- KStream<Integer, String> stream1;
- KStream<Integer, String> stream2;
- KStream<Integer, String> joined;
- MockProcessorSupplier<Integer, String> processor;
+ KStreamBuilder builder = new KStreamBuilder();
- processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(intSerde, stringSerde, topic1);
- stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
- joined.process(processor);
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+ joined.process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
- driver.setTime(0L);
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
- // push two items to the primary stream. the other window is empty
- // w1 = {}
- // w2 = {}
- // --> w1 = { 0:X1, 1:X1 }
- // w2 = {}
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ driver = new KStreamTestDriver(builder, stateDir);
+ driver.setTime(0L);
- processor.checkAndClearProcessResult();
+ // push two items to the primary stream. the other window is empty
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = {}
- // push two items to the other stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = {}
- // --> w1 = { 0:X1, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
- // push all four items to the primary stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ // push all four items to the primary stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
- // push all items to the other stream. this should produce six items.
- // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ // push all items to the other stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- // push all four items to the primary stream. this should produce six items.
- // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ // push all four items to the primary stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- // push two items to the other stream. this should produce six item.
- // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
- processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ // push two items to the other stream. this should produce six item.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
- } finally {
- Utils.delete(baseDir);
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
}
+
+ processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
}
@Test
public void testOuterJoin() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
+ KStreamBuilder builder = new KStreamBuilder();
- KStreamBuilder builder = new KStreamBuilder();
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
- KStream<Integer, String> stream1;
- KStream<Integer, String> stream2;
- KStream<Integer, String> joined;
- MockProcessorSupplier<Integer, String> processor;
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+ joined.process(processor);
- processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(intSerde, stringSerde, topic1);
- stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
- joined.process(processor);
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+ driver = new KStreamTestDriver(builder, stateDir);
+ driver.setTime(0L);
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
- driver.setTime(0L);
+ // push two items to the primary stream. the other window is empty.this should produce two items
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = {}
- // push two items to the primary stream. the other window is empty.this should produce two items
- // w1 = {}
- // w2 = {}
- // --> w1 = { 0:X1, 1:X1 }
- // w2 = {}
-
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
-
- processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
- // push two items to the other stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = {}
- // --> w1 = { 0:X1, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
- // push all four items to the primary stream. this should produce four items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ // push all four items to the primary stream. this should produce four items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
- // push all items to the other stream. this should produce six items.
- // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
- processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ // push all items to the other stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- // push all four items to the primary stream. this should produce six items.
- // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ // push all four items to the primary stream. this should produce six items.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- // push two items to the other stream. this should produce six item.
- // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
- processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ // push two items to the other stream. this should produce six item.
+ // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
- } finally {
- Utils.delete(baseDir);
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
}
+
+ processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
}
@Test
public void testWindowing() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
-
- long time = 0L;
-
- KStreamBuilder builder = new KStreamBuilder();
-
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
+ long time = 0L;
- KStream<Integer, String> stream1;
- KStream<Integer, String> stream2;
- KStream<Integer, String> joined;
- MockProcessorSupplier<Integer, String> processor;
+ KStreamBuilder builder = new KStreamBuilder();
- processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(intSerde, stringSerde, topic1);
- stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
- joined.process(processor);
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+ joined.process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
- driver.setTime(time);
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
- // push two items to the primary stream. the other window is empty. this should produce no items.
- // w1 = {}
- // w2 = {}
- // --> w1 = { 0:X1, 1:X1 }
- // w2 = {}
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ driver = new KStreamTestDriver(builder, stateDir);
+ driver.setTime(time);
- processor.checkAndClearProcessResult();
+ // push two items to the primary stream. the other window is empty. this should produce no items.
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = {}
- // push two items to the other stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = {}
- // --> w1 = { 0:X1, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X1, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
- // clear logically
- time = 1000L;
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.setTime(time + i);
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- // gradually expires items in w1
- // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+ // clear logically
+ time = 1000L;
- time = 1000 + 100L;
- driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.setTime(time + i);
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
+ processor.checkAndClearProcessResult();
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ // gradually expires items in w1
+ // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ time = 1000 + 100L;
+ driver.setTime(time);
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("3:X3+YY3");
+ processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult("3:X3+YY3");
- // go back to the time before expiration
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- time = 1000L - 100L - 1L;
- driver.setTime(time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult();
+ // go back to the time before expiration
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ time = 1000L - 100L - 1L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:X0+YY0");
+ processor.checkAndClearProcessResult();
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+ processor.checkAndClearProcessResult("0:X0+YY0");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
- // clear (logically)
- time = 2000L;
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.setTime(time + i);
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult();
+ // clear (logically)
+ time = 2000L;
- // gradually expires items in w2
- // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.setTime(time + i);
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- time = 2000L + 100L;
- driver.setTime(time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ // gradually expires items in w2
+ // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ time = 2000L + 100L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("3:XX3+Y3");
+ processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult("3:XX3+Y3");
- // go back to the time before expiration
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- time = 2000L - 100L - 1L;
- driver.setTime(time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult();
+ // go back to the time before expiration
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ time = 2000L - 100L - 1L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0");
+ processor.checkAndClearProcessResult();
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
+ processor.checkAndClearProcessResult("0:XX0+Y0");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
- } finally {
- Utils.delete(baseDir);
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 65226d3..65a4b54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@@ -39,245 +41,240 @@ import static org.junit.Assert.assertEquals;
public class KStreamKStreamLeftJoinTest {
- private String topic1 = "topic1";
- private String topic2 = "topic2";
+ final private String topic1 = "topic1";
+ final private String topic2 = "topic2";
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
- @Override
- public String apply(String value1, String value2) {
- return value1 + "+" + value2;
- }
- };
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
- @Test
- public void testLeftJoin() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
- KStreamBuilder builder = new KStreamBuilder();
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
- KStream<Integer, String> stream1;
- KStream<Integer, String> stream2;
- KStream<Integer, String> joined;
- MockProcessorSupplier<Integer, String> processor;
+ @Test
+ public void testLeftJoin() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
- processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(intSerde, stringSerde, topic1);
- stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
- joined.process(processor);
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+ joined.process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
- driver.setTime(0L);
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
- // push two items to the primary stream. the other window is empty
- // w {}
- // --> w = {}
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ driver = new KStreamTestDriver(builder, stateDir);
+ driver.setTime(0L);
- processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+ // push two items to the primary stream. the other window is empty
+ // w {}
+ // --> w = {}
- // push two items to the other stream. this should produce two items.
- // w {}
- // --> w = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
- processor.checkAndClearProcessResult();
+ // push two items to the other stream. this should produce two items.
+ // w {}
+ // --> w = { 0:Y0, 1:Y1 }
- // push all four items to the primary stream. this should produce four items.
- // w = { 0:Y0, 1:Y1 }
- // --> w = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ // push all four items to the primary stream. this should produce four items.
+ // w = { 0:Y0, 1:Y1 }
+ // --> w = { 0:Y0, 1:Y1 }
- // push all items to the other stream. this should produce no items.
- // w = { 0:Y0, 1:Y1 }
- // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
- processor.checkAndClearProcessResult();
+ // push all items to the other stream. this should produce no items.
+ // w = { 0:Y0, 1:Y1 }
+ // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- // push all four items to the primary stream. this should produce four items.
- // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ // push all four items to the primary stream. this should produce four items.
+ // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- } finally {
- Utils.delete(baseDir);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
+
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
}
@Test
public void testWindowing() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
-
- long time = 0L;
-
- KStreamBuilder builder = new KStreamBuilder();
+ final KStreamBuilder builder = new KStreamBuilder();
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
- KStream<Integer, String> stream1;
- KStream<Integer, String> stream2;
- KStream<Integer, String> joined;
- MockProcessorSupplier<Integer, String> processor;
+ long time = 0L;
- processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(intSerde, stringSerde, topic1);
- stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
- joined.process(processor);
+ KStream<Integer, String> stream1;
+ KStream<Integer, String> stream2;
+ KStream<Integer, String> joined;
+ MockProcessorSupplier<Integer, String> processor;
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ processor = new MockProcessorSupplier<>();
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+ joined.process(processor);
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
- driver.setTime(time);
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- // push two items to the primary stream. the other window is empty. this should produce two items
- // w = {}
- // --> w = {}
+ driver = new KStreamTestDriver(builder, stateDir);
+ driver.setTime(time);
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ // push two items to the primary stream. the other window is empty. this should produce two items
+ // w = {}
+ // --> w = {}
- processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
- // push two items to the other stream. this should produce no items.
- // w = {}
- // --> w = { 0:Y0, 1:Y1 }
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
- processor.checkAndClearProcessResult();
+ // push two items to the other stream. this should produce no items.
+ // w = {}
+ // --> w = { 0:Y0, 1:Y1 }
- // clear logically
- time = 1000L;
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- // push all items to the other stream. this should produce no items.
- // w = {}
- // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.setTime(time + i);
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult();
+ // clear logically
+ time = 1000L;
- // gradually expire items in window.
- // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+ // push all items to the other stream. this should produce no items.
+ // w = {}
+ // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.setTime(time + i);
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- time = 1000L + 100L;
- driver.setTime(time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ // gradually expire items in window.
+ // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ time = 1000L + 100L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
- // go back to the time before expiration
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- time = 1000L - 100L - 1L;
- driver.setTime(time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+ // go back to the time before expiration
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ time = 1000L - 100L - 1L;
+ driver.setTime(time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
- driver.setTime(++time);
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
- } finally {
- Utils.delete(baseDir);
+ driver.setTime(++time);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 3acb59a..2c6108b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -20,19 +20,20 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@@ -42,111 +43,105 @@ import static org.junit.Assert.assertEquals;
public class KStreamKTableLeftJoinTest {
- private String topic1 = "topic1";
- private String topic2 = "topic2";
+ final private String topic1 = "topic1";
+ final private String topic2 = "topic2";
- final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
- final private Serde<String> stringSerde = new Serdes.StringSerde();
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
- private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
- @Override
- public String apply(String value1, String value2) {
- return value1 + "+" + value2;
+ private KStreamTestDriver driver = null;
+ private File stateDir = null;
+
+ @After
+ public void tearDown() {
+ if (driver != null) {
+ driver.close();
}
- };
+ driver = null;
+ }
- private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
- new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
- @Override
- public KeyValue<Integer, String> apply(Integer key, String value) {
- return KeyValue.pair(key, value);
- }
- };
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
@Test
public void testJoin() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
-
- KStreamBuilder builder = new KStreamBuilder();
-
- final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
- KStream<Integer, String> stream;
- KTable<Integer, String> table;
- MockProcessorSupplier<Integer, String> processor;
+ KStreamBuilder builder = new KStreamBuilder();
- processor = new MockProcessorSupplier<>();
- stream = builder.stream(intSerde, stringSerde, topic1);
- table = builder.table(intSerde, stringSerde, topic2);
- stream.leftJoin(table, joiner).process(processor);
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ KStream<Integer, String> stream;
+ KTable<Integer, String> table;
+ MockProcessorSupplier<Integer, String> processor;
- assertEquals(1, copartitionGroups.size());
- assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+ processor = new MockProcessorSupplier<>();
+ stream = builder.stream(intSerde, stringSerde, topic1);
+ table = builder.table(intSerde, stringSerde, topic2);
+ stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
- driver.setTime(0L);
+ Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
- // push two items to the primary stream. the other table is empty
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ driver = new KStreamTestDriver(builder, stateDir);
+ driver.setTime(0L);
- processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+ // push two items to the primary stream. the other table is empty
- // push two items to the other stream. this should not produce any item.
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
- processor.checkAndClearProcessResult();
+ // push two items to the other stream. this should not produce any item.
- // push all four items to the primary stream. this should produce four items.
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ // push all four items to the primary stream. this should produce four items.
- // push all items to the other stream. this should not produce any item
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
- }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
- // push all four items to the primary stream. this should produce four items.
+ // push all items to the other stream. this should not produce any item
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ // push all four items to the primary stream. this should produce four items.
- // push two items with null to the other stream as deletes. this should not produce any item.
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ }
- for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], null);
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult();
+ // push two items with null to the other stream as deletes. this should not produce any item.
- // push all four items to the primary stream. this should produce four items.
+ for (int i = 0; i < 2; i++) {
+ driver.process(topic2, expectedKeys[i], null);
+ }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+ // push all four items to the primary stream. this should produce four items.
- } finally {
- Utils.delete(baseDir);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
+
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
}
@Test(expected = KafkaException.class)
@@ -158,10 +153,10 @@ public class KStreamKTableLeftJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(intSerde, stringSerde, topic1).map(keyValueMapper);
+ stream = builder.stream(intSerde, stringSerde, topic1).map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
table = builder.table(intSerde, stringSerde, topic2);
- stream.leftJoin(table, joiner).process(processor);
+ stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 68fa656..00e5d70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -36,6 +37,16 @@ public class KStreamMapTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ private KStreamTestDriver driver = null;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testMap() {
KStreamBuilder builder = new KStreamBuilder();
@@ -56,7 +67,7 @@ public class KStreamMapTest {
processor = new MockProcessorSupplier<>();
stream.map(mapper).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index e671aab..e48b677 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -35,6 +36,16 @@ public class KStreamMapValuesTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ private KStreamTestDriver driver;
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void testFlatMapValues() {
KStreamBuilder builder = new KStreamBuilder();
@@ -54,7 +65,7 @@ public class KStreamMapValuesTest {
stream = builder.stream(intSerde, stringSerde, topicName);
stream.mapValues(mapper).process(processor);
- KStreamTestDriver driver = new KStreamTestDriver(builder);
+ driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], Integer.toString(expectedKeys[i]));
}