You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/07 00:52:14 UTC

flink git commit: [FLINK-2825] FlinkClient.killTopology fails due to missing leader session ID

Repository: flink
Updated Branches:
  refs/heads/master af1e03e13 -> b489c3673


[FLINK-2825] FlinkClient.killTopology fails due to missing leader session ID


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b489c367
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b489c367
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b489c367

Branch: refs/heads/master
Commit: b489c367385453404671d0136ea59a9e63434ea7
Parents: af1e03e
Author: mjsax <mj...@apache.org>
Authored: Tue Oct 6 23:27:45 2015 +0200
Committer: mjsax <mj...@apache.org>
Committed: Tue Oct 6 23:27:45 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/storm/api/FlinkClient.java | 42 ++++++++++----------
 1 file changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b489c367/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 5f0ee21..b19cb38 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -46,7 +46,6 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 
@@ -211,31 +210,34 @@ public class FlinkClient {
 	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
 		final JobID jobId = this.getTopologyJobId(name);
 		if (jobId == null) {
-			throw new NotAliveException();
+			throw new NotAliveException("Storm topology with name " + name + " not found.");
 		}
 
-		try {
-			final ActorRef jobManager = this.getJobManager();
-
-			if (options != null) {
-				try {
-					Thread.sleep(1000 * options.get_wait_secs());
-				} catch (final InterruptedException e) {
-					throw new RuntimeException(e);
-				}
-			}
-
-			final FiniteDuration askTimeout = this.getTimeout();
-			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+		if (options != null) {
 			try {
-				Await.result(response, askTimeout);
-			} catch (final Exception e) {
-				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
+				Thread.sleep(1000 * options.get_wait_secs());
+			} catch (final InterruptedException e) {
+				throw new RuntimeException(e);
 			}
+		}
+
+		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
+		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);
+
+		final Client client;
+		try {
+			client = new Client(configuration);
 		} catch (final IOException e) {
-			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-					+ ":" + this.jobManagerPort, e);
+			throw new RuntimeException("Could not establish a connection to the job manager", e);
 		}
+
+		try {
+			client.cancel(jobId);
+		} catch (final Exception e) {
+			throw new RuntimeException("Cannot stop job.", e);
+		}
+
 	}
 
 	// Flink specific additional methods