You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 16:54:32 UTC

kafka git commit: MINOR: fix cleanup phase for KStreamWindowAggregateTest

Repository: kafka
Updated Branches:
  refs/heads/trunk f54b61909 -> 800d29648


MINOR: fix cleanup phase for KStreamWindowAggregateTest

fixes:
```
java.nio.file.NoSuchFileException: /tmp/test7863510415433793941/topic2-Canonized/topic2-Canonized-197001010000/000015.sst
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
	at sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
	at sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:97)
	at java.nio.file.Files.readAttributes(Files.java:1686)
	at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:105)
	at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:199)
	at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:199)
	at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:199)
	at java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:69)
	at java.nio.file.Files.walkFileTree(Files.java:2602)
	at java.nio.file.Files.walkFileTree(Files.java:2635)
	at org.apache.kafka.common.utils.Utils.delete(Utils.java:555)
	at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregateTest.testJoin(KStreamWindowAggregateTest.java:320)
```

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Eno Thereska <en...@confluent.io>, Damian Guy <da...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #2778 from mjsax/minor-fix-kstreamWindowAggregateTest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/800d2964
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/800d2964
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/800d2964

Branch: refs/heads/trunk
Commit: 800d29648bc370643d6dd1ead861792d7585c843
Parents: f54b619
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Apr 3 09:54:16 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 3 09:54:16 2017 -0700

----------------------------------------------------------------------
 .../internals/KStreamWindowAggregateTest.java   | 498 +++++++++----------
 1 file changed, 243 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/800d2964/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 856cf73..24e0329 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -38,7 +38,6 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 
@@ -49,112 +48,106 @@ public class KStreamWindowAggregateTest {
     private KStreamTestDriver driver = null;
     private File stateDir = null;
 
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
+
     @After
-    public void tearDown() {
+    public void tearDown() throws Exception {
         if (driver != null) {
             driver.close();
+            driver = null;
         }
-        driver = null;
-    }
-
-    @Before
-    public void setUp() throws IOException {
-        stateDir = TestUtils.tempDirectory("kafka-test");
+        Utils.delete(stateDir);
     }
 
     @Test
     public void testAggBasic() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-            String topic1 = "topic1";
-
-            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
-            KTable<Windowed<String>, String> table2 =
-                stream1.groupByKey(strSerde,
-                                   strSerde)
-                    .aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.TOSTRING_ADDER,
-                               TimeWindows.of(10).advanceBy(5),
-                               strSerde, "topic1-Canonized");
-
-            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-            table2.toStream().process(proc2);
-
-            driver = new KStreamTestDriver(builder, baseDir);
-
-            setRecordContext(0, topic1);
-            driver.process(topic1, "A", "1");
-            driver.flushState();
-            setRecordContext(1, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(2, topic1);
-            driver.process(topic1, "C", "3");
-            driver.flushState();
-            setRecordContext(3, topic1);
-            driver.process(topic1, "D", "4");
-            driver.flushState();
-            setRecordContext(4, topic1);
-            driver.process(topic1, "A", "1");
-            driver.flushState();
-
-            setRecordContext(5, topic1);
-            driver.process(topic1, "A", "1");
-            driver.flushState();
-            setRecordContext(6, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(7, topic1);
-            driver.process(topic1, "D", "4");
-            driver.flushState();
-            setRecordContext(8, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(9, topic1);
-            driver.process(topic1, "C", "3");
-            driver.flushState();
-            setRecordContext(10, topic1);
-            driver.process(topic1, "A", "1");
-            driver.flushState();
-            setRecordContext(11, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(12, topic1);
-            driver.flushState();
-            driver.process(topic1, "D", "4");
-            driver.flushState();
-            setRecordContext(13, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(14, topic1);
-            driver.process(topic1, "C", "3");
-            driver.flushState();
-
-
-            assertEquals(Utils.mkList(
-                    "[A@0]:0+1",
-                    "[B@0]:0+2",
-                    "[C@0]:0+3",
-                    "[D@0]:0+4",
-                    "[A@0]:0+1+1",
-
-                    "[A@0]:0+1+1+1", "[A@5]:0+1",
-                    "[B@0]:0+2+2", "[B@5]:0+2",
-                    "[D@0]:0+4+4", "[D@5]:0+4",
-                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                    "[C@0]:0+3+3", "[C@5]:0+3",
-
-                    "[A@5]:0+1+1", "[A@10]:0+1",
-                    "[B@5]:0+2+2+2", "[B@10]:0+2",
-                    "[D@5]:0+4+4", "[D@10]:0+4",
-                    "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
-                    "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
-
-        } finally {
-            Utils.delete(baseDir);
-        }
+        final KStreamBuilder builder = new KStreamBuilder();
+        String topic1 = "topic1";
+
+        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+        KTable<Windowed<String>, String> table2 =
+            stream1.groupByKey(strSerde,
+                               strSerde)
+                .aggregate(MockInitializer.STRING_INIT,
+                           MockAggregator.TOSTRING_ADDER,
+                           TimeWindows.of(10).advanceBy(5),
+                           strSerde, "topic1-Canonized");
+
+        MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        setRecordContext(0, topic1);
+        driver.process(topic1, "A", "1");
+        driver.flushState();
+        setRecordContext(1, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(2, topic1);
+        driver.process(topic1, "C", "3");
+        driver.flushState();
+        setRecordContext(3, topic1);
+        driver.process(topic1, "D", "4");
+        driver.flushState();
+        setRecordContext(4, topic1);
+        driver.process(topic1, "A", "1");
+        driver.flushState();
+
+        setRecordContext(5, topic1);
+        driver.process(topic1, "A", "1");
+        driver.flushState();
+        setRecordContext(6, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(7, topic1);
+        driver.process(topic1, "D", "4");
+        driver.flushState();
+        setRecordContext(8, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(9, topic1);
+        driver.process(topic1, "C", "3");
+        driver.flushState();
+        setRecordContext(10, topic1);
+        driver.process(topic1, "A", "1");
+        driver.flushState();
+        setRecordContext(11, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(12, topic1);
+        driver.flushState();
+        driver.process(topic1, "D", "4");
+        driver.flushState();
+        setRecordContext(13, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(14, topic1);
+        driver.process(topic1, "C", "3");
+        driver.flushState();
+
+
+        assertEquals(Utils.mkList(
+                "[A@0]:0+1",
+                "[B@0]:0+2",
+                "[C@0]:0+3",
+                "[D@0]:0+4",
+                "[A@0]:0+1+1",
+
+                "[A@0]:0+1+1+1", "[A@5]:0+1",
+                "[B@0]:0+2+2", "[B@5]:0+2",
+                "[D@0]:0+4+4", "[D@5]:0+4",
+                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                "[C@0]:0+3+3", "[C@5]:0+3",
+
+                "[A@5]:0+1+1", "[A@10]:0+1",
+                "[B@5]:0+2+2+2", "[B@10]:0+2",
+                "[D@5]:0+4+4", "[D@10]:0+4",
+                "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+                "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
     }
 
     private void setRecordContext(final long time, final String topic) {
@@ -163,161 +156,156 @@ public class KStreamWindowAggregateTest {
 
     @Test
     public void testJoin() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-            String topic1 = "topic1";
-            String topic2 = "topic2";
-
-            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
-            KTable<Windowed<String>, String> table1 =
-                stream1.groupByKey(strSerde, strSerde)
-                    .aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.TOSTRING_ADDER,
-                               TimeWindows.of(10).advanceBy(5),
-                               strSerde, "topic1-Canonized");
-
-            MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
-            table1.toStream().process(proc1);
-
-            KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
-            KTable<Windowed<String>, String> table2 =
-                stream2.groupByKey(strSerde, strSerde)
-                    .aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.TOSTRING_ADDER,
-                               TimeWindows.of(10).advanceBy(5),
-                               strSerde, "topic2-Canonized");
-
-            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-            table2.toStream().process(proc2);
-
-
-            MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
-            table1.join(table2, new ValueJoiner<String, String, String>() {
-                @Override
-                public String apply(String p1, String p2) {
-                    return p1 + "%" + p2;
-                }
-            }).toStream().process(proc3);
-
-            driver = new KStreamTestDriver(builder, baseDir);
-
-            setRecordContext(0, topic1);
-            driver.process(topic1, "A", "1");
-            driver.flushState();
-            setRecordContext(1, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(2, topic1);
-            driver.process(topic1, "C", "3");
-            driver.flushState();
-            setRecordContext(3, topic1);
-            driver.process(topic1, "D", "4");
-            driver.flushState();
-            setRecordContext(4, topic1);
-            driver.process(topic1, "A", "1");
-            driver.flushState();
-
-            proc1.checkAndClearProcessResult(
-                    "[A@0]:0+1",
-                    "[B@0]:0+2",
-                    "[C@0]:0+3",
-                    "[D@0]:0+4",
-                    "[A@0]:0+1+1"
-            );
-            proc2.checkAndClearProcessResult();
-            proc3.checkAndClearProcessResult();
-
-            setRecordContext(5, topic1);
-            driver.process(topic1, "A", "1");
-            driver.flushState();
-            setRecordContext(6, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(7, topic1);
-            driver.process(topic1, "D", "4");
-            driver.flushState();
-            setRecordContext(8, topic1);
-            driver.process(topic1, "B", "2");
-            driver.flushState();
-            setRecordContext(9, topic1);
-            driver.process(topic1, "C", "3");
-            driver.flushState();
-
-            proc1.checkAndClearProcessResult(
-                    "[A@0]:0+1+1+1", "[A@5]:0+1",
-                    "[B@0]:0+2+2", "[B@5]:0+2",
-                    "[D@0]:0+4+4", "[D@5]:0+4",
-                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                    "[C@0]:0+3+3", "[C@5]:0+3"
-            );
-            proc2.checkAndClearProcessResult();
-            proc3.checkAndClearProcessResult();
-
-            setRecordContext(0, topic1);
-            driver.process(topic2, "A", "a");
-            driver.flushState();
-            setRecordContext(1, topic1);
-            driver.process(topic2, "B", "b");
-            driver.flushState();
-            setRecordContext(2, topic1);
-            driver.process(topic2, "C", "c");
-            driver.flushState();
-            setRecordContext(3, topic1);
-            driver.process(topic2, "D", "d");
-            driver.flushState();
-            setRecordContext(4, topic1);
-            driver.process(topic2, "A", "a");
-            driver.flushState();
-
-            proc1.checkAndClearProcessResult();
-            proc2.checkAndClearProcessResult(
-                    "[A@0]:0+a",
-                    "[B@0]:0+b",
-                    "[C@0]:0+c",
-                    "[D@0]:0+d",
-                    "[A@0]:0+a+a"
-            );
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:0+1+1+1%0+a",
-                    "[B@0]:0+2+2+2%0+b",
-                    "[C@0]:0+3+3%0+c",
-                    "[D@0]:0+4+4%0+d",
-                    "[A@0]:0+1+1+1%0+a+a");
-
-            setRecordContext(5, topic1);
-            driver.process(topic2, "A", "a");
-            driver.flushState();
-            setRecordContext(6, topic1);
-            driver.process(topic2, "B", "b");
-            driver.flushState();
-            setRecordContext(7, topic1);
-            driver.process(topic2, "D", "d");
-            driver.flushState();
-            setRecordContext(8, topic1);
-            driver.process(topic2, "B", "b");
-            driver.flushState();
-            setRecordContext(9, topic1);
-            driver.process(topic2, "C", "c");
-            driver.flushState();
-            proc1.checkAndClearProcessResult();
-            proc2.checkAndClearProcessResult(
-                    "[A@0]:0+a+a+a", "[A@5]:0+a",
-                    "[B@0]:0+b+b", "[B@5]:0+b",
-                    "[D@0]:0+d+d", "[D@5]:0+d",
-                    "[B@0]:0+b+b+b", "[B@5]:0+b+b",
-                    "[C@0]:0+c+c", "[C@5]:0+c"
-            );
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
-                    "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
-                    "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
-                    "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
-                    "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
-            );
-        } finally {
-            Utils.delete(baseDir);
-        }
+        final KStreamBuilder builder = new KStreamBuilder();
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+        KTable<Windowed<String>, String> table1 =
+            stream1.groupByKey(strSerde, strSerde)
+                .aggregate(MockInitializer.STRING_INIT,
+                           MockAggregator.TOSTRING_ADDER,
+                           TimeWindows.of(10).advanceBy(5),
+                           strSerde, "topic1-Canonized");
+
+        MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+        table1.toStream().process(proc1);
+
+        KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
+        KTable<Windowed<String>, String> table2 =
+            stream2.groupByKey(strSerde, strSerde)
+                .aggregate(MockInitializer.STRING_INIT,
+                           MockAggregator.TOSTRING_ADDER,
+                           TimeWindows.of(10).advanceBy(5),
+                           strSerde, "topic2-Canonized");
+
+        MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+
+        MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+        table1.join(table2, new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(String p1, String p2) {
+                return p1 + "%" + p2;
+            }
+        }).toStream().process(proc3);
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        setRecordContext(0, topic1);
+        driver.process(topic1, "A", "1");
+        driver.flushState();
+        setRecordContext(1, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(2, topic1);
+        driver.process(topic1, "C", "3");
+        driver.flushState();
+        setRecordContext(3, topic1);
+        driver.process(topic1, "D", "4");
+        driver.flushState();
+        setRecordContext(4, topic1);
+        driver.process(topic1, "A", "1");
+        driver.flushState();
+
+        proc1.checkAndClearProcessResult(
+                "[A@0]:0+1",
+                "[B@0]:0+2",
+                "[C@0]:0+3",
+                "[D@0]:0+4",
+                "[A@0]:0+1+1"
+        );
+        proc2.checkAndClearProcessResult();
+        proc3.checkAndClearProcessResult();
+
+        setRecordContext(5, topic1);
+        driver.process(topic1, "A", "1");
+        driver.flushState();
+        setRecordContext(6, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(7, topic1);
+        driver.process(topic1, "D", "4");
+        driver.flushState();
+        setRecordContext(8, topic1);
+        driver.process(topic1, "B", "2");
+        driver.flushState();
+        setRecordContext(9, topic1);
+        driver.process(topic1, "C", "3");
+        driver.flushState();
+
+        proc1.checkAndClearProcessResult(
+                "[A@0]:0+1+1+1", "[A@5]:0+1",
+                "[B@0]:0+2+2", "[B@5]:0+2",
+                "[D@0]:0+4+4", "[D@5]:0+4",
+                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                "[C@0]:0+3+3", "[C@5]:0+3"
+        );
+        proc2.checkAndClearProcessResult();
+        proc3.checkAndClearProcessResult();
+
+        setRecordContext(0, topic1);
+        driver.process(topic2, "A", "a");
+        driver.flushState();
+        setRecordContext(1, topic1);
+        driver.process(topic2, "B", "b");
+        driver.flushState();
+        setRecordContext(2, topic1);
+        driver.process(topic2, "C", "c");
+        driver.flushState();
+        setRecordContext(3, topic1);
+        driver.process(topic2, "D", "d");
+        driver.flushState();
+        setRecordContext(4, topic1);
+        driver.process(topic2, "A", "a");
+        driver.flushState();
+
+        proc1.checkAndClearProcessResult();
+        proc2.checkAndClearProcessResult(
+                "[A@0]:0+a",
+                "[B@0]:0+b",
+                "[C@0]:0+c",
+                "[D@0]:0+d",
+                "[A@0]:0+a+a"
+        );
+        proc3.checkAndClearProcessResult(
+                "[A@0]:0+1+1+1%0+a",
+                "[B@0]:0+2+2+2%0+b",
+                "[C@0]:0+3+3%0+c",
+                "[D@0]:0+4+4%0+d",
+                "[A@0]:0+1+1+1%0+a+a");
+
+        setRecordContext(5, topic1);
+        driver.process(topic2, "A", "a");
+        driver.flushState();
+        setRecordContext(6, topic1);
+        driver.process(topic2, "B", "b");
+        driver.flushState();
+        setRecordContext(7, topic1);
+        driver.process(topic2, "D", "d");
+        driver.flushState();
+        setRecordContext(8, topic1);
+        driver.process(topic2, "B", "b");
+        driver.flushState();
+        setRecordContext(9, topic1);
+        driver.process(topic2, "C", "c");
+        driver.flushState();
+        proc1.checkAndClearProcessResult();
+        proc2.checkAndClearProcessResult(
+                "[A@0]:0+a+a+a", "[A@5]:0+a",
+                "[B@0]:0+b+b", "[B@5]:0+b",
+                "[D@0]:0+d+d", "[D@5]:0+d",
+                "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+                "[C@0]:0+c+c", "[C@5]:0+c"
+        );
+        proc3.checkAndClearProcessResult(
+                "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+                "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+                "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+                "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+                "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+        );
     }
+
 }