You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/08/24 09:34:30 UTC
[1/2] flink git commit: [hotfix][tests] Implement AutoCloseable in
TestHarness
Repository: flink
Updated Branches:
refs/heads/master d21d5d632 -> 3f4de57b1
[hotfix][tests] Implement AutoCloseable in TestHarness
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87e5b8be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87e5b8be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87e5b8be
Branch: refs/heads/master
Commit: 87e5b8be38455651f3cf6275d2fedb8a8ef52bc6
Parents: d21d5d6
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 14 15:09:39 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Aug 24 11:29:36 2017 +0200
----------------------------------------------------------------------
.../flink/streaming/util/AbstractStreamOperatorTestHarness.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87e5b8be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 15802353..720346a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -90,7 +90,7 @@ import static org.mockito.Mockito.when;
/**
* Base class for {@code AbstractStreamOperator} test harnesses.
*/
-public class AbstractStreamOperatorTestHarness<OUT> {
+public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
protected final StreamOperator<OUT> operator;
[2/2] flink git commit: [hotfix][streaming] Refactor
TwoPhaseCommitSinkFunctionTest
Posted by al...@apache.org.
[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f4de57b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f4de57b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f4de57b
Branch: refs/heads/master
Commit: 3f4de57b1e2dfd532ed7b95805365eec340ebe64
Parents: 87e5b8b
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 17 15:46:47 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Aug 24 11:33:34 2017 +0200
----------------------------------------------------------------------
.../sink/TwoPhaseCommitSinkFunctionTest.java | 123 +++++++++++--------
1 file changed, 75 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3f4de57b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index e5bb630..9d01e74 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.BufferedWriter;
@@ -31,7 +33,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.Writer;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.ArrayList;
@@ -43,56 +44,52 @@ import java.util.UUID;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for {@link TwoPhaseCommitSinkFunction}.
*/
public class TwoPhaseCommitSinkFunctionTest {
- @Test
- public void testNotifyOfCompletedCheckpoint() throws Exception {
- File tmpDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_tmp").toFile();
- File targetDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_target").toFile();
- OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestHarness(tmpDirectory, targetDirectory);
-
- testHarness.setup();
- testHarness.open();
- testHarness.processElement("42", 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement("43", 2);
- testHarness.snapshot(1, 3);
- testHarness.processElement("44", 4);
- testHarness.snapshot(2, 5);
- testHarness.notifyOfCompletedCheckpoint(1);
-
- assertExactlyOnceForDirectory(targetDirectory, Arrays.asList("42", "43"));
- assertEquals(2, tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction
- testHarness.close();
+ TestContext context;
+
+ @Before
+ public void setUp() throws Exception {
+ context = new TestContext();
}
- public OneInputStreamOperatorTestHarness<String, Object> createTestHarness(File tmpDirectory, File targetDirectory) throws Exception {
- tmpDirectory.deleteOnExit();
- targetDirectory.deleteOnExit();
- FileBasedSinkFunction sinkFunction = new FileBasedSinkFunction(tmpDirectory, targetDirectory);
- return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
+ @After
+ public void tearDown() throws Exception {
+ context.close();
+ }
+
+ @Test
+ public void testNotifyOfCompletedCheckpoint() throws Exception {
+ context.harness.open();
+ context.harness.processElement("42", 0);
+ context.harness.snapshot(0, 1);
+ context.harness.processElement("43", 2);
+ context.harness.snapshot(1, 3);
+ context.harness.processElement("44", 4);
+ context.harness.snapshot(2, 5);
+ context.harness.notifyOfCompletedCheckpoint(1);
+
+ assertExactlyOnceForDirectory(context.targetDirectory, Arrays.asList("42", "43"));
+ assertEquals(2, context.tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction
}
@Test
public void testFailBeforeNotify() throws Exception {
- File tmpDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_tmp").toFile();
- File targetDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_target").toFile();
- OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestHarness(tmpDirectory, targetDirectory);
-
- testHarness.setup();
- testHarness.open();
- testHarness.processElement("42", 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement("43", 2);
- OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
-
- assertTrue(tmpDirectory.setWritable(false));
+ context.harness.open();
+ context.harness.processElement("42", 0);
+ context.harness.snapshot(0, 1);
+ context.harness.processElement("43", 2);
+ OperatorStateHandles snapshot = context.harness.snapshot(1, 3);
+
+ assertTrue(context.tmpDirectory.setWritable(false));
try {
- testHarness.processElement("44", 4);
- testHarness.snapshot(2, 5);
+ context.harness.processElement("44", 4);
+ context.harness.snapshot(2, 5);
+ fail("something should fail");
}
catch (Exception ex) {
if (!(ex.getCause() instanceof FileNotFoundException)) {
@@ -100,17 +97,17 @@ public class TwoPhaseCommitSinkFunctionTest {
}
// ignore
}
- testHarness.close();
+ context.close();
+
+ assertTrue(context.tmpDirectory.setWritable(true));
- assertTrue(tmpDirectory.setWritable(true));
+ context.open();
+ context.harness.initializeState(snapshot);
- testHarness = createTestHarness(tmpDirectory, targetDirectory);
- testHarness.setup();
- testHarness.initializeState(snapshot);
- testHarness.close();
+ assertExactlyOnceForDirectory(context.targetDirectory, Arrays.asList("42", "43"));
+ context.close();
- assertExactlyOnceForDirectory(targetDirectory, Arrays.asList("42", "43"));
- assertEquals(0, tmpDirectory.listFiles().length);
+ assertEquals(0, context.tmpDirectory.listFiles().length);
}
private void assertExactlyOnceForDirectory(File targetDirectory, List<String> expectedValues) throws IOException {
@@ -184,11 +181,41 @@ public class TwoPhaseCommitSinkFunctionTest {
private static class FileTransaction {
private final File tmpFile;
- private final transient Writer writer;
+ private final transient BufferedWriter writer;
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new FileWriter(tmpFile));
}
+
+ @Override
+ public String toString() {
+ return String.format("FileTransaction[%s]", tmpFile.getName());
+ }
+ }
+
+ private static class TestContext implements AutoCloseable {
+ public final File tmpDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_tmp").toFile();
+ public final File targetDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_target").toFile();
+
+ public FileBasedSinkFunction sinkFunction;
+ public OneInputStreamOperatorTestHarness<String, Object> harness;
+
+ private TestContext() throws Exception {
+ tmpDirectory.deleteOnExit();
+ targetDirectory.deleteOnExit();
+ open();
+ }
+
+ @Override
+ public void close() throws Exception {
+ harness.close();
+ }
+
+ public void open() throws Exception {
+ sinkFunction = new FileBasedSinkFunction(tmpDirectory, targetDirectory);
+ harness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
+ harness.setup();
+ }
}
}