You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/15 20:31:03 UTC
flink git commit: [FLINK-2512] [storm compatibility] Make sure client
is properly closed in FlinkSubmitter
Repository: flink
Updated Branches:
refs/heads/master f350e45d9 -> c2b1eb796
[FLINK-2512] [storm compatibility] Make sure client is properly closed in FlinkSubmitter
This closes #1009
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2b1eb79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2b1eb79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2b1eb79
Branch: refs/heads/master
Commit: c2b1eb7961ed16b8985d7f8c02e87c0964b818e8
Parents: f350e45
Author: ffbin <86...@qq.com>
Authored: Wed Aug 12 10:06:21 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Aug 15 19:12:51 2015 +0200
----------------------------------------------------------------------
.../stormcompatibility/api/FlinkSubmitter.java | 29 ++++++++++----------
1 file changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c2b1eb79/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index 2648fe4..819dbbc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -100,22 +100,23 @@ public class FlinkSubmitter {
final String serConf = JSONValue.toJSONString(stormConf);
final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
- if (client.getTopologyJobId(name) != null) {
- throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
- }
- String localJar = System.getProperty("storm.jar");
- if (localJar == null) {
- try {
- for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
- .getJars()) {
- // TODO verify that there is onnly one jar
- localJar = file.getAbsolutePath();
+ try {
+ if (client.getTopologyJobId(name) != null) {
+ throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+ }
+ String localJar = System.getProperty("storm.jar");
+ if (localJar == null) {
+ try {
+ for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+ .getJars()) {
+ // TODO verify that there is onnly one jar
+ localJar = file.getAbsolutePath();
+ }
+ } catch (final ClassCastException e) {
+ // ignore
}
- } catch (final ClassCastException e) {
- // ignore
}
- }
- try {
+
logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
client.submitTopologyWithOpts(name, localJar, topology);
} catch (final InvalidTopologyException e) {