You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2019/11/29 13:53:32 UTC
[flink] 04/09: [FLINK-14762][client] Implement JobClient#cancel
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 318e406e62ea64ffc656782d8867f0f72fc6468a
Author: tison <wa...@gmail.com>
AuthorDate: Fri Nov 29 09:51:24 2019 +0800
[FLINK-14762][client] Implement JobClient#cancel
---
.../flink/client/deployment/ClusterClientJobClientAdapter.java | 6 ++++++
.../src/main/java/org/apache/flink/core/execution/JobClient.java | 5 +++++
.../src/test/java/org/apache/flink/api/java/TestingJobClient.java | 5 +++++
.../org/apache/flink/streaming/environment/TestingJobClient.java | 5 +++++
4 files changed, 21 insertions(+)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 6ba558d..602d64d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.FunctionUtils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -56,6 +57,11 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
}
@Override
+ public CompletableFuture<Void> cancel() {
+ return clusterClient.cancel(jobID).thenApply(FunctionUtils.nullFn());
+ }
+
+ @Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
checkNotNull(userClassloader);
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index fd99840..9f4614c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -36,6 +36,11 @@ public interface JobClient extends AutoCloseable {
JobID getJobID();
/**
+ * Cancels the associated job.
+ */
+ CompletableFuture<Void> cancel();
+
+ /**
* Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
*
* @param userClassloader the classloader used to de-serialize the accumulators of the job.
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
index f8af3fc..215efed 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
@@ -40,4 +40,9 @@ public class TestingJobClient implements JobClient {
return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap()));
}
+ @Override
+ public CompletableFuture<Void> cancel() {
+ return CompletableFuture.completedFuture(null);
+ }
+
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
index a36c6e1..af1025c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
@@ -40,4 +40,9 @@ public class TestingJobClient implements JobClient {
return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap()));
}
+ @Override
+ public CompletableFuture<Void> cancel() {
+ return CompletableFuture.completedFuture(null);
+ }
+
}