You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/07 00:52:14 UTC
flink git commit: [FLINK-2825] FlinkClient.killTopology fails due to
missing leader session ID
Repository: flink
Updated Branches:
refs/heads/master af1e03e13 -> b489c3673
[FLINK-2825] FlinkClient.killTopology fails due to missing leader session ID
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b489c367
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b489c367
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b489c367
Branch: refs/heads/master
Commit: b489c367385453404671d0136ea59a9e63434ea7
Parents: af1e03e
Author: mjsax <mj...@apache.org>
Authored: Tue Oct 6 23:27:45 2015 +0200
Committer: mjsax <mj...@apache.org>
Committed: Tue Oct 6 23:27:45 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/storm/api/FlinkClient.java | 42 ++++++++++----------
1 file changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b489c367/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 5f0ee21..b19cb38 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -46,7 +46,6 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.storm.util.StormConfig;
@@ -211,31 +210,34 @@ public class FlinkClient {
public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
final JobID jobId = this.getTopologyJobId(name);
if (jobId == null) {
- throw new NotAliveException();
+ throw new NotAliveException("Storm topology with name " + name + " not found.");
}
- try {
- final ActorRef jobManager = this.getJobManager();
-
- if (options != null) {
- try {
- Thread.sleep(1000 * options.get_wait_secs());
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- final FiniteDuration askTimeout = this.getTimeout();
- final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+ if (options != null) {
try {
- Await.result(response, askTimeout);
- } catch (final Exception e) {
- throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
+ Thread.sleep(1000 * options.get_wait_secs());
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
}
+ }
+
+ final Configuration configuration = GlobalConfiguration.getConfiguration();
+ configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);
+
+ final Client client;
+ try {
+ client = new Client(configuration);
} catch (final IOException e) {
- throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
- + ":" + this.jobManagerPort, e);
+ throw new RuntimeException("Could not establish a connection to the job manager", e);
}
+
+ try {
+ client.cancel(jobId);
+ } catch (final Exception e) {
+ throw new RuntimeException("Cannot stop job.", e);
+ }
+
}
// Flink specific additional methods