You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 16:30:30 UTC

flink git commit: [client] send cancel message via the job manager's ActorGateway

Repository: flink
Updated Branches:
  refs/heads/master ca6dd4275 -> 1b3bdce5c


[client] send cancel message via the job manager's ActorGateway


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b3bdce5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b3bdce5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b3bdce5

Branch: refs/heads/master
Commit: 1b3bdce5ccb24a485ebc6452d16f78c770486481
Parents: ca6dd42
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jul 30 15:03:23 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 15:24:57 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/client/program/Client.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b3bdce5/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 50590df..1de0703 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -27,8 +27,6 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -459,11 +457,17 @@ public class Client {
 		try {
 			jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout);
 		} catch (Exception e) {
-			LOG.error("Error in getting the remote reference for the job manager", e);
-			throw new ProgramInvocationException("Failed to resolve JobManager", e);
+			throw new ProgramInvocationException("Error getting the remote actor reference for the job manager.", e);
+		}
+
+		Future<Object> response;
+		try {
+			ActorGateway jobManagerGateway = JobManager.getJobManagerGateway(jobManager, timeout);
+			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
+		} catch (Exception e) {
+			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
 		}
 
-		Future<Object> response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(timeout));
 		Object result = Await.result(response, timeout);
 
 		if (result instanceof JobManagerMessages.CancellationSuccess) {