You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/23 10:14:01 UTC
[4/5] flink git commit: [FLINK-9841][rest] Close log file channel
after response was fully written
[FLINK-9841][rest] Close log file channel after response was fully written
This closes #6329.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d58bf4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d58bf4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d58bf4a
Branch: refs/heads/master
Commit: 8d58bf4a110441a0f3c9d592f8f34ccacb7cb9a8
Parents: 9e0a4b5
Author: yanghua <ya...@gmail.com>
Authored: Fri Jul 13 16:48:04 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:19:20 2018 +0200
----------------------------------------------------------------------
.../AbstractTaskManagerFileHandler.java | 25 +++++++++++++++++---
1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8d58bf4a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index edefa15..4c7ac94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -63,6 +63,7 @@ import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureList
import javax.annotation.Nonnull;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
@@ -208,11 +209,20 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
}
private void transferFile(ChannelHandlerContext ctx, File file, HttpRequest httpRequest) throws FlinkException {
- try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
- final long fileLength = randomAccessFile.length();
+ final RandomAccessFile randomAccessFile;
- try (final FileChannel fileChannel = randomAccessFile.getChannel()) {
+ try {
+ randomAccessFile = new RandomAccessFile(file, "r");
+ } catch (FileNotFoundException e) {
+ throw new FlinkException("Can not find file " + file + ".", e);
+ }
+
+ try {
+ final long fileLength = randomAccessFile.length();
+ final FileChannel fileChannel = randomAccessFile.getChannel();
+
+ try {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
response.headers().set(CONTENT_TYPE, "text/plain");
@@ -251,8 +261,17 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
if (!HttpHeaders.isKeepAlive(httpRequest)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
+ } catch (IOException ex) {
+ fileChannel.close();
+ throw ex;
}
} catch (IOException ioe) {
+ try {
+ randomAccessFile.close();
+ } catch (IOException e) {
+ throw new FlinkException("Close file or channel error.", e);
+ }
+
throw new FlinkException("Could not transfer file " + file + " to the client.", ioe);
}
}