You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 16:30:30 UTC
flink git commit: [client] send cancel message via the job manager's
ActorGateway
Repository: flink
Updated Branches:
refs/heads/master ca6dd4275 -> 1b3bdce5c
[client] send cancel message via the job manager's ActorGateway
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b3bdce5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b3bdce5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b3bdce5
Branch: refs/heads/master
Commit: 1b3bdce5ccb24a485ebc6452d16f78c770486481
Parents: ca6dd42
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jul 30 15:03:23 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 15:24:57 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/client/program/Client.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b3bdce5/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 50590df..1de0703 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -27,8 +27,6 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -459,11 +457,17 @@ public class Client {
try {
jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout);
} catch (Exception e) {
- LOG.error("Error in getting the remote reference for the job manager", e);
- throw new ProgramInvocationException("Failed to resolve JobManager", e);
+ throw new ProgramInvocationException("Error getting the remote actor reference for the job manager.", e);
+ }
+
+ Future<Object> response;
+ try {
+ ActorGateway jobManagerGateway = JobManager.getJobManagerGateway(jobManager, timeout);
+ response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
+ } catch (Exception e) {
+ throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
}
- Future<Object> response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(timeout));
Object result = Await.result(response, timeout);
if (result instanceof JobManagerMessages.CancellationSuccess) {