You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/28 13:32:20 UTC

[3/4] flink git commit: [hotfix] Flush in CsvOutputFormat before closing, to increase CI stability

[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d13a05d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d13a05d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d13a05d

Branch: refs/heads/master
Commit: 3d13a05d1b2354e027626db280f9bfce9070e570
Parents: 8e76322
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 23 15:37:05 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/CsvOutputFormat.java      |  1 +
 .../flink/api/java/io/CsvOutputFormatTest.java  | 47 +++++++++++++-------
 2 files changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index 703128f..c2fe13c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -165,6 +165,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	@Override
 	public void close() throws IOException {
 		if (wrt != null) {
+			this.wrt.flush();
 			this.wrt.close();
 		}
 		super.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
index a9288c6..a8ce495 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -34,25 +35,30 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
 
+import static org.junit.Assert.fail;
+
 public class CsvOutputFormatTest {
 
 	private String path = null;
-	private CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat;
 
 	@Before
 	public void createFile() throws Exception {
 		path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
-		csvOutputFormat = new CsvOutputFormat<>(new Path(path));
 	}
 
 	@Test
 	public void testNullAllow() throws Exception {
-		csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-		csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
-		csvOutputFormat.setAllowNullValues(true);
-		csvOutputFormat.open(0, 1);
-		csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
-		csvOutputFormat.close();
+		final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+		try {
+			csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+			csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+			csvOutputFormat.setAllowNullValues(true);
+			csvOutputFormat.open(0, 1);
+			csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+		}
+		finally {
+			csvOutputFormat.close();
+		}
 
 		java.nio.file.Path p = Paths.get(path);
 		Assert.assertTrue(Files.exists(p));
@@ -61,19 +67,28 @@ public class CsvOutputFormatTest {
 		Assert.assertEquals("One,,8", lines.get(0));
 	}
 
-	@Test(expected = RuntimeException.class)
+	@Test
 	public void testNullDisallowOnDefault() throws Exception {
-		csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-		csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
-		csvOutputFormat.open(0, 1);
-		csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
-		csvOutputFormat.close();
+		final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+		try {
+			csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+			csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+			csvOutputFormat.open(0, 1);
+			try {
+				csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+				fail("should fail with an exception");
+			} catch (RuntimeException e) {
+				// expected
+			}
+			
+		}
+		finally {
+			csvOutputFormat.close();
+		}
 	}
 
 	@After
 	public void cleanUp() throws IOException {
-		csvOutputFormat.close();
 		Files.deleteIfExists(Paths.get(path));
 	}
-
 }