You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/15 22:23:48 UTC

[6/6] flink git commit: [FLINK-9751] [filesystem] Use FileChannel directly in LocalRecoverableFsDataOutputStream

[FLINK-9751] [filesystem] Use FileChannel directly in LocalRecoverableFsDataOutputStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a912e60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a912e60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a912e60

Branch: refs/heads/master
Commit: 7a912e603a040acfc46d394e86794b2c031298cb
Parents: 9d238e1
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 15:13:21 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:44 2018 +0200

----------------------------------------------------------------------
 .../LocalRecoverableFsDataOutputStream.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a912e60/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index fd8e8fe..df9288e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -27,10 +27,14 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
 import java.nio.file.AtomicMoveNotSupportedException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -44,12 +48,16 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 
 	private final File tempFile;
 
-	private final FileOutputStream fos;
+	private final FileChannel fileChannel;
+
+	private final OutputStream fos;
 
 	LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
 		this.targetFile = checkNotNull(targetFile);
 		this.tempFile = checkNotNull(tempFile);
-		this.fos = new FileOutputStream(tempFile);
+
+		this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+		this.fos = Channels.newOutputStream(fileChannel);
 	}
 
 	LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
@@ -57,15 +65,15 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 		this.tempFile = checkNotNull(resumable.tempFile());
 
 		if (!tempFile.exists()) {
-			throw new FileNotFoundException("File Not Found: " + tempFile);
+			throw new FileNotFoundException("File Not Found: " + tempFile.getAbsolutePath());
 		}
 
-		if (tempFile.length() < resumable.offset()) {
-			throw new IOException("Missing data in tmp file: " + tempFile);
+		this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND);
+		if (this.fileChannel.position() < resumable.offset()) {
+			throw new IOException("Missing data in tmp file: " + tempFile.getAbsolutePath());
 		}
-
-		this.fos = new FileOutputStream(this.tempFile, true);
-		this.fos.getChannel().truncate(resumable.offset());
+		this.fileChannel.truncate(resumable.offset());
+		this.fos = Channels.newOutputStream(fileChannel);
 	}
 
 	@Override
@@ -85,12 +93,12 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
 
 	@Override
 	public void sync() throws IOException {
-		fos.getFD().sync();
+		fileChannel.force(true);
 	}
 
 	@Override
 	public long getPos() throws IOException {
-		return fos.getChannel().position();
+		return fileChannel.position();
 	}
 
 	@Override