You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 16:54:32 UTC
kafka git commit: MINOR: fix cleanup phase for
KStreamWindowAggregateTest
Repository: kafka
Updated Branches:
refs/heads/trunk f54b61909 -> 800d29648
MINOR: fix cleanup phase for KStreamWindowAggregateTest
fixes:
```
java.nio.file.NoSuchFileException: /tmp/test7863510415433793941/topic2-Canonized/topic2-Canonized-197001010000/000015.sst
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
at sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:97)
at java.nio.file.Files.readAttributes(Files.java:1686)
at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:105)
at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:199)
at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:199)
at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:199)
at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:69)
at java.nio.file.Files.walkFileTree(Files.java:2602)
at java.nio.file.Files.walkFileTree(Files.java:2635)
at org.apache.kafka.common.utils.Utils.delete(Utils.java:555)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregateTest.testJoin(KStreamWindowAggregateTest.java:320)
```
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Eno Thereska <en...@confluent.io>, Damian Guy <da...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #2778 from mjsax/minor-fix-kstreamWindowAggregateTest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/800d2964
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/800d2964
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/800d2964
Branch: refs/heads/trunk
Commit: 800d29648bc370643d6dd1ead861792d7585c843
Parents: f54b619
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Apr 3 09:54:16 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 3 09:54:16 2017 -0700
----------------------------------------------------------------------
.../internals/KStreamWindowAggregateTest.java | 498 +++++++++----------
1 file changed, 243 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/800d2964/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 856cf73..24e0329 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -38,7 +38,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
@@ -49,112 +48,106 @@ public class KStreamWindowAggregateTest {
private KStreamTestDriver driver = null;
private File stateDir = null;
+ @Before
+ public void setUp() throws IOException {
+ stateDir = TestUtils.tempDirectory("kafka-test");
+ }
+
@After
- public void tearDown() {
+ public void tearDown() throws Exception {
if (driver != null) {
driver.close();
+ driver = null;
}
- driver = null;
- }
-
- @Before
- public void setUp() throws IOException {
- stateDir = TestUtils.tempDirectory("kafka-test");
+ Utils.delete(stateDir);
}
@Test
public void testAggBasic() throws Exception {
- final File baseDir = Files.createTempDirectory("test").toFile();
-
- try {
- final KStreamBuilder builder = new KStreamBuilder();
- String topic1 = "topic1";
-
- KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
- KTable<Windowed<String>, String> table2 =
- stream1.groupByKey(strSerde,
- strSerde)
- .aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- TimeWindows.of(10).advanceBy(5),
- strSerde, "topic1-Canonized");
-
- MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
-
- driver = new KStreamTestDriver(builder, baseDir);
-
- setRecordContext(0, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(1, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(2, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
- setRecordContext(3, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(4, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
-
- setRecordContext(5, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(6, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(7, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(8, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(9, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
- setRecordContext(10, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(11, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(12, topic1);
- driver.flushState();
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(13, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(14, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
-
-
- assertEquals(Utils.mkList(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1",
-
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3",
-
- "[A@5]:0+1+1", "[A@10]:0+1",
- "[B@5]:0+2+2+2", "[B@10]:0+2",
- "[D@5]:0+4+4", "[D@10]:0+4",
- "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
- "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
-
- } finally {
- Utils.delete(baseDir);
- }
+ final KStreamBuilder builder = new KStreamBuilder();
+ String topic1 = "topic1";
+
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+ KTable<Windowed<String>, String> table2 =
+ stream1.groupByKey(strSerde,
+ strSerde)
+ .aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ TimeWindows.of(10).advanceBy(5),
+ strSerde, "topic1-Canonized");
+
+ MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ driver = new KStreamTestDriver(builder, stateDir);
+
+ setRecordContext(0, topic1);
+ driver.process(topic1, "A", "1");
+ driver.flushState();
+ setRecordContext(1, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(2, topic1);
+ driver.process(topic1, "C", "3");
+ driver.flushState();
+ setRecordContext(3, topic1);
+ driver.process(topic1, "D", "4");
+ driver.flushState();
+ setRecordContext(4, topic1);
+ driver.process(topic1, "A", "1");
+ driver.flushState();
+
+ setRecordContext(5, topic1);
+ driver.process(topic1, "A", "1");
+ driver.flushState();
+ setRecordContext(6, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(7, topic1);
+ driver.process(topic1, "D", "4");
+ driver.flushState();
+ setRecordContext(8, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(9, topic1);
+ driver.process(topic1, "C", "3");
+ driver.flushState();
+ setRecordContext(10, topic1);
+ driver.process(topic1, "A", "1");
+ driver.flushState();
+ setRecordContext(11, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(12, topic1);
+ driver.flushState();
+ driver.process(topic1, "D", "4");
+ driver.flushState();
+ setRecordContext(13, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(14, topic1);
+ driver.process(topic1, "C", "3");
+ driver.flushState();
+
+
+ assertEquals(Utils.mkList(
+ "[A@0]:0+1",
+ "[B@0]:0+2",
+ "[C@0]:0+3",
+ "[D@0]:0+4",
+ "[A@0]:0+1+1",
+
+ "[A@0]:0+1+1+1", "[A@5]:0+1",
+ "[B@0]:0+2+2", "[B@5]:0+2",
+ "[D@0]:0+4+4", "[D@5]:0+4",
+ "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+ "[C@0]:0+3+3", "[C@5]:0+3",
+
+ "[A@5]:0+1+1", "[A@10]:0+1",
+ "[B@5]:0+2+2+2", "[B@10]:0+2",
+ "[D@5]:0+4+4", "[D@10]:0+4",
+ "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+ "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
}
private void setRecordContext(final long time, final String topic) {
@@ -163,161 +156,156 @@ public class KStreamWindowAggregateTest {
@Test
public void testJoin() throws Exception {
- final File baseDir = Files.createTempDirectory("test").toFile();
-
- try {
- final KStreamBuilder builder = new KStreamBuilder();
- String topic1 = "topic1";
- String topic2 = "topic2";
-
- KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
- KTable<Windowed<String>, String> table1 =
- stream1.groupByKey(strSerde, strSerde)
- .aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- TimeWindows.of(10).advanceBy(5),
- strSerde, "topic1-Canonized");
-
- MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
- table1.toStream().process(proc1);
-
- KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
- KTable<Windowed<String>, String> table2 =
- stream2.groupByKey(strSerde, strSerde)
- .aggregate(MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- TimeWindows.of(10).advanceBy(5),
- strSerde, "topic2-Canonized");
-
- MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
-
-
- MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
- table1.join(table2, new ValueJoiner<String, String, String>() {
- @Override
- public String apply(String p1, String p2) {
- return p1 + "%" + p2;
- }
- }).toStream().process(proc3);
-
- driver = new KStreamTestDriver(builder, baseDir);
-
- setRecordContext(0, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(1, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(2, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
- setRecordContext(3, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(4, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
-
- proc1.checkAndClearProcessResult(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1"
- );
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult();
-
- setRecordContext(5, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(6, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(7, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(8, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(9, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
-
- proc1.checkAndClearProcessResult(
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3"
- );
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult();
-
- setRecordContext(0, topic1);
- driver.process(topic2, "A", "a");
- driver.flushState();
- setRecordContext(1, topic1);
- driver.process(topic2, "B", "b");
- driver.flushState();
- setRecordContext(2, topic1);
- driver.process(topic2, "C", "c");
- driver.flushState();
- setRecordContext(3, topic1);
- driver.process(topic2, "D", "d");
- driver.flushState();
- setRecordContext(4, topic1);
- driver.process(topic2, "A", "a");
- driver.flushState();
-
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
- "[A@0]:0+a",
- "[B@0]:0+b",
- "[C@0]:0+c",
- "[D@0]:0+d",
- "[A@0]:0+a+a"
- );
- proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a",
- "[B@0]:0+2+2+2%0+b",
- "[C@0]:0+3+3%0+c",
- "[D@0]:0+4+4%0+d",
- "[A@0]:0+1+1+1%0+a+a");
-
- setRecordContext(5, topic1);
- driver.process(topic2, "A", "a");
- driver.flushState();
- setRecordContext(6, topic1);
- driver.process(topic2, "B", "b");
- driver.flushState();
- setRecordContext(7, topic1);
- driver.process(topic2, "D", "d");
- driver.flushState();
- setRecordContext(8, topic1);
- driver.process(topic2, "B", "b");
- driver.flushState();
- setRecordContext(9, topic1);
- driver.process(topic2, "C", "c");
- driver.flushState();
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
- "[A@0]:0+a+a+a", "[A@5]:0+a",
- "[B@0]:0+b+b", "[B@5]:0+b",
- "[D@0]:0+d+d", "[D@5]:0+d",
- "[B@0]:0+b+b+b", "[B@5]:0+b+b",
- "[C@0]:0+c+c", "[C@5]:0+c"
- );
- proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
- "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
- "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
- "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
- "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
- );
- } finally {
- Utils.delete(baseDir);
- }
+ final KStreamBuilder builder = new KStreamBuilder();
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+ KTable<Windowed<String>, String> table1 =
+ stream1.groupByKey(strSerde, strSerde)
+ .aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ TimeWindows.of(10).advanceBy(5),
+ strSerde, "topic1-Canonized");
+
+ MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+ table1.toStream().process(proc1);
+
+ KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
+ KTable<Windowed<String>, String> table2 =
+ stream2.groupByKey(strSerde, strSerde)
+ .aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ TimeWindows.of(10).advanceBy(5),
+ strSerde, "topic2-Canonized");
+
+ MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+
+ MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+ table1.join(table2, new ValueJoiner<String, String, String>() {
+ @Override
+ public String apply(String p1, String p2) {
+ return p1 + "%" + p2;
+ }
+ }).toStream().process(proc3);
+
+ driver = new KStreamTestDriver(builder, stateDir);
+
+ setRecordContext(0, topic1);
+ driver.process(topic1, "A", "1");
+ driver.flushState();
+ setRecordContext(1, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(2, topic1);
+ driver.process(topic1, "C", "3");
+ driver.flushState();
+ setRecordContext(3, topic1);
+ driver.process(topic1, "D", "4");
+ driver.flushState();
+ setRecordContext(4, topic1);
+ driver.process(topic1, "A", "1");
+ driver.flushState();
+
+ proc1.checkAndClearProcessResult(
+ "[A@0]:0+1",
+ "[B@0]:0+2",
+ "[C@0]:0+3",
+ "[D@0]:0+4",
+ "[A@0]:0+1+1"
+ );
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult();
+
+ setRecordContext(5, topic1);
+ driver.process(topic1, "A", "1");
+ driver.flushState();
+ setRecordContext(6, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(7, topic1);
+ driver.process(topic1, "D", "4");
+ driver.flushState();
+ setRecordContext(8, topic1);
+ driver.process(topic1, "B", "2");
+ driver.flushState();
+ setRecordContext(9, topic1);
+ driver.process(topic1, "C", "3");
+ driver.flushState();
+
+ proc1.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1", "[A@5]:0+1",
+ "[B@0]:0+2+2", "[B@5]:0+2",
+ "[D@0]:0+4+4", "[D@5]:0+4",
+ "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+ "[C@0]:0+3+3", "[C@5]:0+3"
+ );
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult();
+
+ setRecordContext(0, topic1);
+ driver.process(topic2, "A", "a");
+ driver.flushState();
+ setRecordContext(1, topic1);
+ driver.process(topic2, "B", "b");
+ driver.flushState();
+ setRecordContext(2, topic1);
+ driver.process(topic2, "C", "c");
+ driver.flushState();
+ setRecordContext(3, topic1);
+ driver.process(topic2, "D", "d");
+ driver.flushState();
+ setRecordContext(4, topic1);
+ driver.process(topic2, "A", "a");
+ driver.flushState();
+
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
+ "[A@0]:0+a",
+ "[B@0]:0+b",
+ "[C@0]:0+c",
+ "[D@0]:0+d",
+ "[A@0]:0+a+a"
+ );
+ proc3.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1%0+a",
+ "[B@0]:0+2+2+2%0+b",
+ "[C@0]:0+3+3%0+c",
+ "[D@0]:0+4+4%0+d",
+ "[A@0]:0+1+1+1%0+a+a");
+
+ setRecordContext(5, topic1);
+ driver.process(topic2, "A", "a");
+ driver.flushState();
+ setRecordContext(6, topic1);
+ driver.process(topic2, "B", "b");
+ driver.flushState();
+ setRecordContext(7, topic1);
+ driver.process(topic2, "D", "d");
+ driver.flushState();
+ setRecordContext(8, topic1);
+ driver.process(topic2, "B", "b");
+ driver.flushState();
+ setRecordContext(9, topic1);
+ driver.process(topic2, "C", "c");
+ driver.flushState();
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
+ "[A@0]:0+a+a+a", "[A@5]:0+a",
+ "[B@0]:0+b+b", "[B@5]:0+b",
+ "[D@0]:0+d+d", "[D@5]:0+d",
+ "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+ "[C@0]:0+c+c", "[C@5]:0+c"
+ );
+ proc3.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+ "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+ "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+ "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+ "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+ );
}
+
}