You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/28 13:32:20 UTC
[3/4] flink git commit: [hotfix] Flush in CsvOutputFormat before
closing, to increase CI stability
[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d13a05d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d13a05d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d13a05d
Branch: refs/heads/master
Commit: 3d13a05d1b2354e027626db280f9bfce9070e570
Parents: 8e76322
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 23 15:37:05 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100
----------------------------------------------------------------------
.../flink/api/java/io/CsvOutputFormat.java | 1 +
.../flink/api/java/io/CsvOutputFormatTest.java | 47 +++++++++++++-------
2 files changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index 703128f..c2fe13c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -165,6 +165,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
@Override
public void close() throws IOException {
if (wrt != null) {
+ this.wrt.flush();
this.wrt.close();
}
super.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
index a9288c6..a8ce495 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -34,25 +35,30 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
+import static org.junit.Assert.fail;
+
public class CsvOutputFormatTest {
private String path = null;
- private CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat;
@Before
public void createFile() throws Exception {
path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
- csvOutputFormat = new CsvOutputFormat<>(new Path(path));
}
@Test
public void testNullAllow() throws Exception {
- csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
- csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
- csvOutputFormat.setAllowNullValues(true);
- csvOutputFormat.open(0, 1);
- csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
- csvOutputFormat.close();
+ final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+ try {
+ csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+ csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+ csvOutputFormat.setAllowNullValues(true);
+ csvOutputFormat.open(0, 1);
+ csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+ }
+ finally {
+ csvOutputFormat.close();
+ }
java.nio.file.Path p = Paths.get(path);
Assert.assertTrue(Files.exists(p));
@@ -61,19 +67,28 @@ public class CsvOutputFormatTest {
Assert.assertEquals("One,,8", lines.get(0));
}
- @Test(expected = RuntimeException.class)
+ @Test
public void testNullDisallowOnDefault() throws Exception {
- csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
- csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
- csvOutputFormat.open(0, 1);
- csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
- csvOutputFormat.close();
+ final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+ try {
+ csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+ csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+ csvOutputFormat.open(0, 1);
+ try {
+ csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+ fail("should fail with an exception");
+ } catch (RuntimeException e) {
+ // expected
+ }
+
+ }
+ finally {
+ csvOutputFormat.close();
+ }
}
@After
public void cleanUp() throws IOException {
- csvOutputFormat.close();
Files.deleteIfExists(Paths.get(path));
}
-
}