You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/29 17:49:21 UTC

[flink] branch master updated: [FLINK-19437][tests] Fix unstable test FileSourceTextLinesITCase.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d999126  [FLINK-19437][tests] Fix unstable test FileSourceTextLinesITCase.
d999126 is described below

commit d9991269d38feaef7165534fc29563e6ce7f4bce
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 29 17:19:17 2020 +0200

    [FLINK-19437][tests] Fix unstable test FileSourceTextLinesITCase.
    
    The test was unstable because test data files were not written atomically. Because of that, the
    streaming source could ingest partial files, leading to unexpected results.
    
    This fix makes file writing atomic by first writing to a hidden temp file and then renaming the
    file to the final file path.
---
 .../file/src/FileSourceTextLinesITCase.java        | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 8876e7e..0c92cbc 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -242,14 +242,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
 
 	private static void writeFile(File testDir, int num) throws IOException {
 		final File file = new File(testDir, FILE_PATHS[num]);
-		final File parent = file.getParentFile();
-		assertTrue(parent.mkdirs() || parent.exists());
-
-		try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
-			for (String line : LINES_PER_FILE[num]) {
-				writer.println(line);
-			}
-		}
+		writeFileAtomically(file, LINES_PER_FILE[num]);
 	}
 
 	private static void writeAllFiles(File testDir) throws IOException {
@@ -259,15 +252,28 @@ public class FileSourceTextLinesITCase extends TestLogger {
 	}
 
 	private static void writeHiddenJunkFiles(File testDir) throws IOException {
+		final String[] junkContents = new String[] {
+			"This should not end up in the test result.",
+			"Foo bar bazzl junk"
+		};
+
 		for (String junkPath : HIDDEN_JUNK_PATHS) {
 			final File file = new File(testDir, junkPath);
-			final File parent = file.getParentFile();
-			assertTrue(parent.mkdirs() || parent.exists());
+			writeFileAtomically(file, junkContents);
+		}
+	}
 
-			try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
-				writer.println("This should not end up in the test result.");
-				writer.println("Foo bar bazzl junk");
+	private static void writeFileAtomically(File file, String[] lines) throws IOException {
+		final File parent = file.getParentFile();
+		final File stagingFile = new File(parent, ".tmp-" + file.getName());
+		assertTrue(parent.mkdirs() || parent.exists());
+
+		try (PrintWriter writer = new PrintWriter(new FileWriter(stagingFile))) {
+			for (String line : lines) {
+				writer.println(line);
 			}
 		}
+
+		assertTrue(stagingFile.renameTo(file));
 	}
 }