You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/05 20:56:17 UTC
[4/4] flink git commit: [FLINK-5390] [yarn] Fix for proper closing
input and output streams, in case of errors
[FLINK-5390] [yarn] Fix for proper closing input and output streams, in case of errors
This closes #3045
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f7ad84a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f7ad84a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f7ad84a
Branch: refs/heads/master
Commit: 9f7ad84abf7f1c33c4ee40be1eb0297a28a30f57
Parents: af5aa36
Author: Roman Maier <ro...@epam.com>
Authored: Mon Dec 26 17:24:11 2016 +0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 5 20:22:45 2017 +0100
----------------------------------------------------------------------
.../apache/flink/yarn/AbstractYarnClusterDescriptor.java | 8 ++++----
.../flink/yarn/YarnFlinkApplicationMasterRunner.java | 11 +++++++----
2 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f7ad84a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0d1239d..cf970b0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -721,10 +721,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
try {
File fp = File.createTempFile(appId.toString(), null);
fp.deleteOnExit();
- FileOutputStream input = new FileOutputStream(fp);
- ObjectOutputStream obInput = new ObjectOutputStream(input);
- obInput.writeObject(jobGraph);
- input.close();
+ try (FileOutputStream output = new FileOutputStream(fp);
+ ObjectOutputStream obOutput = new ObjectOutputStream(output);){
+ obOutput.writeObject(jobGraph);
+ }
LocalResource jobgraph = Records.newRecord(LocalResource.class);
Path remoteJobGraph =
Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
http://git-wip-us.apache.org/repos/asf/flink/blob/9f7ad84a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 188d9ef..257212b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -53,6 +53,7 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
/**
@@ -271,10 +272,12 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
if (jobGraphFile != null) {
File fp = new File(jobGraphFile);
if (fp.isFile()) {
- FileInputStream input = new FileInputStream(fp);
- ObjectInputStream obInput = new ObjectInputStream(input);
- jg = (JobGraph) obInput.readObject();
- input.close();
+ try (FileInputStream input = new FileInputStream(fp);
+ ObjectInputStream obInput = new ObjectInputStream(input);) {
+ jg = (JobGraph) obInput.readObject();
+ } catch (IOException e) {
+ LOG.warn("Failed to read job graph file", e);
+ }
}
}
if (jg == null) {