You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/08/24 09:34:30 UTC

[1/2] flink git commit: [hotfix][tests] Implement AutoCloseable in TestHarness

Repository: flink
Updated Branches:
  refs/heads/master d21d5d632 -> 3f4de57b1


[hotfix][tests] Implement AutoCloseable in TestHarness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87e5b8be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87e5b8be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87e5b8be

Branch: refs/heads/master
Commit: 87e5b8be38455651f3cf6275d2fedb8a8ef52bc6
Parents: d21d5d6
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 14 15:09:39 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Aug 24 11:29:36 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/util/AbstractStreamOperatorTestHarness.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87e5b8be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 15802353..720346a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -90,7 +90,7 @@ import static org.mockito.Mockito.when;
 /**
  * Base class for {@code AbstractStreamOperator} test harnesses.
  */
-public class AbstractStreamOperatorTestHarness<OUT> {
+public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 
 	protected final StreamOperator<OUT> operator;
 


[2/2] flink git commit: [hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest

Posted by al...@apache.org.
[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f4de57b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f4de57b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f4de57b

Branch: refs/heads/master
Commit: 3f4de57b1e2dfd532ed7b95805365eec340ebe64
Parents: 87e5b8b
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 17 15:46:47 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Aug 24 11:33:34 2017 +0200

----------------------------------------------------------------------
 .../sink/TwoPhaseCommitSinkFunctionTest.java    | 123 +++++++++++--------
 1 file changed, 75 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f4de57b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index e5bb630..9d01e74 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.BufferedWriter;
@@ -31,7 +33,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.Writer;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -43,56 +44,52 @@ import java.util.UUID;
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-	@Test
-	public void testNotifyOfCompletedCheckpoint() throws Exception {
-		File tmpDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_tmp").toFile();
-		File targetDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_target").toFile();
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestHarness(tmpDirectory, targetDirectory);
-
-		testHarness.setup();
-		testHarness.open();
-		testHarness.processElement("42", 0);
-		testHarness.snapshot(0, 1);
-		testHarness.processElement("43", 2);
-		testHarness.snapshot(1, 3);
-		testHarness.processElement("44", 4);
-		testHarness.snapshot(2, 5);
-		testHarness.notifyOfCompletedCheckpoint(1);
-
-		assertExactlyOnceForDirectory(targetDirectory, Arrays.asList("42", "43"));
-		assertEquals(2, tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction
-		testHarness.close();
+	TestContext context;
+
+	@Before
+	public void setUp() throws Exception {
+		context = new TestContext();
 	}
 
-	public OneInputStreamOperatorTestHarness<String, Object> createTestHarness(File tmpDirectory, File targetDirectory) throws Exception {
-		tmpDirectory.deleteOnExit();
-		targetDirectory.deleteOnExit();
-		FileBasedSinkFunction sinkFunction = new FileBasedSinkFunction(tmpDirectory, targetDirectory);
-		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
+	@After
+	public void tearDown() throws Exception {
+		context.close();
+	}
+
+	@Test
+	public void testNotifyOfCompletedCheckpoint() throws Exception {
+		context.harness.open();
+		context.harness.processElement("42", 0);
+		context.harness.snapshot(0, 1);
+		context.harness.processElement("43", 2);
+		context.harness.snapshot(1, 3);
+		context.harness.processElement("44", 4);
+		context.harness.snapshot(2, 5);
+		context.harness.notifyOfCompletedCheckpoint(1);
+
+		assertExactlyOnceForDirectory(context.targetDirectory, Arrays.asList("42", "43"));
+		assertEquals(2, context.tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction
 	}
 
 	@Test
 	public void testFailBeforeNotify() throws Exception {
-		File tmpDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_tmp").toFile();
-		File targetDirectory = Files.createTempDirectory(this.getClass().getSimpleName() + "_target").toFile();
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestHarness(tmpDirectory, targetDirectory);
-
-		testHarness.setup();
-		testHarness.open();
-		testHarness.processElement("42", 0);
-		testHarness.snapshot(0, 1);
-		testHarness.processElement("43", 2);
-		OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
-
-		assertTrue(tmpDirectory.setWritable(false));
+		context.harness.open();
+		context.harness.processElement("42", 0);
+		context.harness.snapshot(0, 1);
+		context.harness.processElement("43", 2);
+		OperatorStateHandles snapshot = context.harness.snapshot(1, 3);
+
+		assertTrue(context.tmpDirectory.setWritable(false));
 		try {
-			testHarness.processElement("44", 4);
-			testHarness.snapshot(2, 5);
+			context.harness.processElement("44", 4);
+			context.harness.snapshot(2, 5);
+			fail("something should fail");
 		}
 		catch (Exception ex) {
 			if (!(ex.getCause() instanceof FileNotFoundException)) {
@@ -100,17 +97,17 @@ public class TwoPhaseCommitSinkFunctionTest {
 			}
 			// ignore
 		}
-		testHarness.close();
+		context.close();
+
+		assertTrue(context.tmpDirectory.setWritable(true));
 
-		assertTrue(tmpDirectory.setWritable(true));
+		context.open();
+		context.harness.initializeState(snapshot);
 
-		testHarness = createTestHarness(tmpDirectory, targetDirectory);
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.close();
+		assertExactlyOnceForDirectory(context.targetDirectory, Arrays.asList("42", "43"));
+		context.close();
 
-		assertExactlyOnceForDirectory(targetDirectory, Arrays.asList("42", "43"));
-		assertEquals(0, tmpDirectory.listFiles().length);
+		assertEquals(0, context.tmpDirectory.listFiles().length);
 	}
 
 	private void assertExactlyOnceForDirectory(File targetDirectory, List<String> expectedValues) throws IOException {
@@ -184,11 +181,41 @@ public class TwoPhaseCommitSinkFunctionTest {
 
 	private static class FileTransaction {
 		private final File tmpFile;
-		private final transient Writer writer;
+		private final transient BufferedWriter writer;
 
 		public FileTransaction(File tmpFile) throws IOException {
 			this.tmpFile = tmpFile;
 			this.writer = new BufferedWriter(new FileWriter(tmpFile));
 		}
+
+		@Override
+		public String toString() {
+			return String.format("FileTransaction[%s]", tmpFile.getName());
+		}
+	}
+
+	private static class TestContext implements AutoCloseable {
+		public final File tmpDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_tmp").toFile();
+		public final File targetDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_target").toFile();
+
+		public FileBasedSinkFunction sinkFunction;
+		public OneInputStreamOperatorTestHarness<String, Object> harness;
+
+		private TestContext() throws Exception {
+			tmpDirectory.deleteOnExit();
+			targetDirectory.deleteOnExit();
+			open();
+		}
+
+		@Override
+		public void close() throws Exception {
+			harness.close();
+		}
+
+		public void open() throws Exception {
+			sinkFunction = new FileBasedSinkFunction(tmpDirectory, targetDirectory);
+			harness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
+			harness.setup();
+		}
 	}
 }