You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/15 22:23:48 UTC
[6/6] flink git commit: [FLINK-9751] [filesystem] Use FileChannel
directly in LocalRecoverableFsDataOutputStream
[FLINK-9751] [filesystem] Use FileChannel directly in LocalRecoverableFsDataOutputStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a912e60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a912e60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a912e60
Branch: refs/heads/master
Commit: 7a912e603a040acfc46d394e86794b2c031298cb
Parents: 9d238e1
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 13 15:13:21 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 15 23:20:44 2018 +0200
----------------------------------------------------------------------
.../LocalRecoverableFsDataOutputStream.java | 28 +++++++++++++-------
1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a912e60/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index fd8e8fe..df9288e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -27,10 +27,14 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,12 +48,16 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
private final File tempFile;
- private final FileOutputStream fos;
+ private final FileChannel fileChannel;
+
+ private final OutputStream fos;
LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
this.targetFile = checkNotNull(targetFile);
this.tempFile = checkNotNull(tempFile);
- this.fos = new FileOutputStream(tempFile);
+
+ this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+ this.fos = Channels.newOutputStream(fileChannel);
}
LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
@@ -57,15 +65,15 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
this.tempFile = checkNotNull(resumable.tempFile());
if (!tempFile.exists()) {
- throw new FileNotFoundException("File Not Found: " + tempFile);
+ throw new FileNotFoundException("File Not Found: " + tempFile.getAbsolutePath());
}
- if (tempFile.length() < resumable.offset()) {
- throw new IOException("Missing data in tmp file: " + tempFile);
+ this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND);
+ if (this.fileChannel.position() < resumable.offset()) {
+ throw new IOException("Missing data in tmp file: " + tempFile.getAbsolutePath());
}
-
- this.fos = new FileOutputStream(this.tempFile, true);
- this.fos.getChannel().truncate(resumable.offset());
+ this.fileChannel.truncate(resumable.offset());
+ this.fos = Channels.newOutputStream(fileChannel);
}
@Override
@@ -85,12 +93,12 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
@Override
public void sync() throws IOException {
- fos.getFD().sync();
+ fileChannel.force(true);
}
@Override
public long getPos() throws IOException {
- return fos.getChannel().position();
+ return fileChannel.position();
}
@Override