You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/15 20:31:03 UTC

flink git commit: [FLINK-2512] [storm compatibility] Make sure client is properly closed in FlinkSubmitter

Repository: flink
Updated Branches:
  refs/heads/master f350e45d9 -> c2b1eb796


[FLINK-2512] [storm compatibility] Make sure client is properly closed in FlinkSubmitter

This closes #1009


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2b1eb79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2b1eb79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2b1eb79

Branch: refs/heads/master
Commit: c2b1eb7961ed16b8985d7f8c02e87c0964b818e8
Parents: f350e45
Author: ffbin <86...@qq.com>
Authored: Wed Aug 12 10:06:21 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Aug 15 19:12:51 2015 +0200

----------------------------------------------------------------------
 .../stormcompatibility/api/FlinkSubmitter.java  | 29 ++++++++++----------
 1 file changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2b1eb79/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index 2648fe4..819dbbc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -100,22 +100,23 @@ public class FlinkSubmitter {
 		final String serConf = JSONValue.toJSONString(stormConf);
 
 		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
-		if (client.getTopologyJobId(name) != null) {
-			throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
-		}
-		String localJar = System.getProperty("storm.jar");
-		if (localJar == null) {
-			try {
-				for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
-						.getJars()) {
-					// TODO verify that there is onnly one jar
-					localJar = file.getAbsolutePath();
+		try {
+			if (client.getTopologyJobId(name) != null) {
+				throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+			}
+			String localJar = System.getProperty("storm.jar");
+			if (localJar == null) {
+				try {
+					for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+							.getJars()) {
+						// TODO verify that there is onnly one jar
+						localJar = file.getAbsolutePath();
+					}
+				} catch (final ClassCastException e) {
+					// ignore
 				}
-			} catch (final ClassCastException e) {
-				// ignore
 			}
-		}
-		try {
+
 			logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
 			client.submitTopologyWithOpts(name, localJar, topology);
 		} catch (final InvalidTopologyException e) {