You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/03 20:50:25 UTC

[5/8] flink git commit: [FLINK-2322] [streaming] Close file streams to release resources early.

[FLINK-2322] [streaming] Close file streams to release resources early.

This closes #928


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0693c92b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0693c92b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0693c92b

Branch: refs/heads/master
Commit: 0693c92bdda655e1fbce232038909a7c2a385a22
Parents: fab61a1
Author: tedyu <yu...@gmail.com>
Authored: Tue Jul 21 11:13:17 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/sca/UdfAnalyzerUtils.java     | 19 ++++++++++++++++---
 .../flink/api/java/utils/ParameterTool.java      |  4 +++-
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index df1e421..dbfd29e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -32,6 +32,7 @@ import org.objectweb.asm.tree.analysis.BasicValue;
 import org.objectweb.asm.tree.analysis.Value;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -114,12 +115,14 @@ public final class UdfAnalyzerUtils {
 	 */
 	@SuppressWarnings("unchecked")
 	public static Object[] findMethodNode(String internalClassName, String name, String desc) {
+		InputStream stream = null;
 		try {
 			// iterate through hierarchy and search for method node /
 			// class that really implements the method
 			while (internalClassName != null) {
-				ClassReader cr = new ClassReader(Thread.currentThread().getContextClassLoader()
-						.getResourceAsStream(internalClassName.replace('.', '/') + ".class"));
+				stream = Thread.currentThread().getContextClassLoader()
+						.getResourceAsStream(internalClassName.replace('.', '/') + ".class");
+				ClassReader cr = new ClassReader(stream);
 				final ClassNode cn = new ClassNode();
 				cr.accept(cn, 0);
 				for (MethodNode mn : (List<MethodNode>) cn.methods) {
@@ -129,9 +132,19 @@ public final class UdfAnalyzerUtils {
 				}
 				internalClassName = cr.getSuperName();
 			}
-		} catch (IOException e) {
+		}
+		catch (IOException e) {
 			throw new IllegalStateException("Method '" + name + "' could not be found", e);
 		}
+		finally {
+			if (stream != null) {
+				try {
+					stream.close();
+				} catch (IOException e) { 
+					// best effort cleanup
+				}
+			}
+		}
 		throw new IllegalStateException("Method '" + name + "' could not be found");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 317dce4..b60a559 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -130,7 +130,9 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 			throw new FileNotFoundException("Properties file "+path+" does not exist");
 		}
 		Properties props = new Properties();
-		props.load(new FileInputStream(propertiesFile));
+		FileInputStream fis = new FileInputStream(propertiesFile);
+		props.load(fis);
+		fis.close();
 		return fromMap((Map)props);
 	}