You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/05 20:56:17 UTC

[4/4] flink git commit: [FLINK-5390] [yarn] Fix for proper closing input and output streams, in case of errors

[FLINK-5390] [yarn] Fix for proper closing input and output streams, in case of errors

This closes #3045


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f7ad84a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f7ad84a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f7ad84a

Branch: refs/heads/master
Commit: 9f7ad84abf7f1c33c4ee40be1eb0297a28a30f57
Parents: af5aa36
Author: Roman Maier <ro...@epam.com>
Authored: Mon Dec 26 17:24:11 2016 +0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 5 20:22:45 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/AbstractYarnClusterDescriptor.java |  8 ++++----
 .../flink/yarn/YarnFlinkApplicationMasterRunner.java     | 11 +++++++----
 2 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f7ad84a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0d1239d..cf970b0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -721,10 +721,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			try {
 				File fp = File.createTempFile(appId.toString(), null);
 				fp.deleteOnExit();
-				FileOutputStream input = new FileOutputStream(fp);
-				ObjectOutputStream obInput = new ObjectOutputStream(input);
-				obInput.writeObject(jobGraph);
-				input.close();
+				try (FileOutputStream output = new FileOutputStream(fp);
+					ObjectOutputStream obOutput = new ObjectOutputStream(output);){
+					obOutput.writeObject(jobGraph);
+				}
 				LocalResource jobgraph = Records.newRecord(LocalResource.class);
 				Path remoteJobGraph =
 						Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());

http://git-wip-us.apache.org/repos/asf/flink/blob/9f7ad84a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 188d9ef..257212b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -53,6 +53,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 
 /**
@@ -271,10 +272,12 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 		if (jobGraphFile != null) {
 			File fp = new File(jobGraphFile);
 			if (fp.isFile()) {
-				FileInputStream input = new FileInputStream(fp);
-				ObjectInputStream obInput = new ObjectInputStream(input);
-				jg = (JobGraph) obInput.readObject();
-				input.close();
+				try (FileInputStream input = new FileInputStream(fp);
+					ObjectInputStream obInput = new ObjectInputStream(input);) {
+					jg = (JobGraph) obInput.readObject();
+				} catch (IOException e) {
+					LOG.warn("Failed to read job graph file", e);
+				}
 			}
 		}
 		if (jg == null) {