You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/03 20:50:25 UTC
[5/8] flink git commit: [FLINK-2322] [streaming] Close file streams
to release resources early.
[FLINK-2322] [streaming] Close file streams to release resources early.
This closes #928
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0693c92b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0693c92b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0693c92b
Branch: refs/heads/master
Commit: 0693c92bdda655e1fbce232038909a7c2a385a22
Parents: fab61a1
Author: tedyu <yu...@gmail.com>
Authored: Tue Jul 21 11:13:17 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/sca/UdfAnalyzerUtils.java | 19 ++++++++++++++++---
.../flink/api/java/utils/ParameterTool.java | 4 +++-
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index df1e421..dbfd29e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -32,6 +32,7 @@ import org.objectweb.asm.tree.analysis.BasicValue;
import org.objectweb.asm.tree.analysis.Value;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
@@ -114,12 +115,14 @@ public final class UdfAnalyzerUtils {
*/
@SuppressWarnings("unchecked")
public static Object[] findMethodNode(String internalClassName, String name, String desc) {
+ InputStream stream = null;
try {
// iterate through hierarchy and search for method node /
// class that really implements the method
while (internalClassName != null) {
- ClassReader cr = new ClassReader(Thread.currentThread().getContextClassLoader()
- .getResourceAsStream(internalClassName.replace('.', '/') + ".class"));
+ stream = Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream(internalClassName.replace('.', '/') + ".class");
+ ClassReader cr = new ClassReader(stream);
final ClassNode cn = new ClassNode();
cr.accept(cn, 0);
for (MethodNode mn : (List<MethodNode>) cn.methods) {
@@ -129,9 +132,19 @@ public final class UdfAnalyzerUtils {
}
internalClassName = cr.getSuperName();
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new IllegalStateException("Method '" + name + "' could not be found", e);
}
+ finally {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // best effort cleanup
+ }
+ }
+ }
throw new IllegalStateException("Method '" + name + "' could not be found");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 317dce4..b60a559 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -130,7 +130,9 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
throw new FileNotFoundException("Properties file "+path+" does not exist");
}
Properties props = new Properties();
- props.load(new FileInputStream(propertiesFile));
+ FileInputStream fis = new FileInputStream(propertiesFile);
+ props.load(fis);
+ fis.close();
return fromMap((Map)props);
}