You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/03/02 07:53:03 UTC
[03/12] flink git commit: [FLINK-8811] [flip6] Implement
MiniClusterClient#cancel
[FLINK-8811] [flip6] Implement MiniClusterClient#cancel
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8039464d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8039464d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8039464d
Branch: refs/heads/master
Commit: 8039464df8f315b1fd06831e11dfc2ef4466b888
Parents: 96a176a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Feb 28 17:54:06 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Mar 2 08:52:32 2018 +0100
----------------------------------------------------------------------
.../apache/flink/client/program/MiniClusterClient.java | 2 +-
.../apache/flink/runtime/minicluster/MiniCluster.java | 13 ++++++++++++-
2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8039464d/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index e99addd..b98e895 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -82,7 +82,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
@Override
public void cancel(JobID jobId) throws Exception {
- throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation.");
+ miniCluster.cancelJob(jobId);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8039464d/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 5b086ca..2efdb03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -467,7 +467,18 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
} catch (LeaderRetrievalException | InterruptedException e) {
return FutureUtils.completedExceptionally(
new FlinkException(
- String.format("Could not retrieve job status for job %s", jobId),
+ String.format("Could not retrieve job status for job %s.", jobId),
+ e));
+ }
+ }
+
+ public CompletableFuture<Acknowledge> cancelJob(JobID jobId) {
+ try {
+ return getDispatcherGateway().cancelJob(jobId, rpcTimeout);
+ } catch (LeaderRetrievalException | InterruptedException e) {
+ return FutureUtils.completedExceptionally(
+ new FlinkException(
+ String.format("Could not cancel job %s.", jobId),
e));
}
}