You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/23 10:14:01 UTC

[4/5] flink git commit: [FLINK-9841][rest] Close log file channel after response was fully written

[FLINK-9841][rest] Close log file channel after response was fully written

This closes #6329.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d58bf4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d58bf4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d58bf4a

Branch: refs/heads/master
Commit: 8d58bf4a110441a0f3c9d592f8f34ccacb7cb9a8
Parents: 9e0a4b5
Author: yanghua <ya...@gmail.com>
Authored: Fri Jul 13 16:48:04 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:19:20 2018 +0200

----------------------------------------------------------------------
 .../AbstractTaskManagerFileHandler.java         | 25 +++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d58bf4a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index edefa15..4c7ac94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -63,6 +63,7 @@ import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureList
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
@@ -208,11 +209,20 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 	}
 
 	private void transferFile(ChannelHandlerContext ctx, File file, HttpRequest httpRequest) throws FlinkException {
-		try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
-			final long fileLength = randomAccessFile.length();
+		final RandomAccessFile randomAccessFile;
 
-			try (final FileChannel fileChannel = randomAccessFile.getChannel()) {
+		try {
+			randomAccessFile = new RandomAccessFile(file, "r");
+		} catch (FileNotFoundException e) {
+			throw new FlinkException("Can not find file " + file + ".", e);
+		}
+
+		try {
 
+			final long fileLength = randomAccessFile.length();
+			final FileChannel fileChannel = randomAccessFile.getChannel();
+
+			try {
 				HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
 				response.headers().set(CONTENT_TYPE, "text/plain");
 
@@ -251,8 +261,17 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 				if (!HttpHeaders.isKeepAlive(httpRequest)) {
 					lastContentFuture.addListener(ChannelFutureListener.CLOSE);
 				}
+			} catch (IOException ex) {
+				fileChannel.close();
+				throw ex;
 			}
 		} catch (IOException ioe) {
+			try {
+				randomAccessFile.close();
+			} catch (IOException e) {
+				throw new FlinkException("Close file or channel error.", e);
+			}
+
 			throw new FlinkException("Could not transfer file " + file + " to the client.", ioe);
 		}
 	}