You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/03/02 07:53:03 UTC

[03/12] flink git commit: [FLINK-8811] [flip6] Implement MiniClusterClient#cancel

[FLINK-8811] [flip6] Implement MiniClusterClient#cancel


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8039464d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8039464d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8039464d

Branch: refs/heads/master
Commit: 8039464df8f315b1fd06831e11dfc2ef4466b888
Parents: 96a176a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Feb 28 17:54:06 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Mar 2 08:52:32 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/program/MiniClusterClient.java |  2 +-
 .../apache/flink/runtime/minicluster/MiniCluster.java  | 13 ++++++++++++-
 2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8039464d/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index e99addd..b98e895 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -82,7 +82,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 
 	@Override
 	public void cancel(JobID jobId) throws Exception {
-		throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation.");
+		miniCluster.cancelJob(jobId);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8039464d/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 5b086ca..2efdb03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -467,7 +467,18 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		} catch (LeaderRetrievalException | InterruptedException e) {
 			return FutureUtils.completedExceptionally(
 				new FlinkException(
-					String.format("Could not retrieve job status for job %s", jobId),
+					String.format("Could not retrieve job status for job %s.", jobId),
+					e));
+		}
+	}
+
+	public CompletableFuture<Acknowledge> cancelJob(JobID jobId) {
+		try {
+			return getDispatcherGateway().cancelJob(jobId, rpcTimeout);
+		} catch (LeaderRetrievalException | InterruptedException e) {
+			return FutureUtils.completedExceptionally(
+				new FlinkException(
+					String.format("Could not cancel job %s.", jobId),
 					e));
 		}
 	}