You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2019/11/29 13:53:32 UTC

[flink] 04/09: [FLINK-14762][client] Implement JobClient#cancel

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 318e406e62ea64ffc656782d8867f0f72fc6468a
Author: tison <wa...@gmail.com>
AuthorDate: Fri Nov 29 09:51:24 2019 +0800

    [FLINK-14762][client] Implement JobClient#cancel
---
 .../flink/client/deployment/ClusterClientJobClientAdapter.java      | 6 ++++++
 .../src/main/java/org/apache/flink/core/execution/JobClient.java    | 5 +++++
 .../src/test/java/org/apache/flink/api/java/TestingJobClient.java   | 5 +++++
 .../org/apache/flink/streaming/environment/TestingJobClient.java    | 5 +++++
 4 files changed, 21 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 6ba558d..602d64d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.FunctionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -56,6 +57,11 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
 	}
 
 	@Override
+	public CompletableFuture<Void> cancel() {
+		return clusterClient.cancel(jobID).thenApply(FunctionUtils.nullFn());
+	}
+
+	@Override
 	public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
 		checkNotNull(userClassloader);
 
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index fd99840..9f4614c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -36,6 +36,11 @@ public interface JobClient extends AutoCloseable {
 	JobID getJobID();
 
 	/**
+	 * Cancels the associated job.
+	 */
+	CompletableFuture<Void> cancel();
+
+	/**
 	 * Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
 	 *
 	 * @param userClassloader the classloader used to de-serialize the accumulators of the job.
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
index f8af3fc..215efed 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
@@ -40,4 +40,9 @@ public class TestingJobClient implements JobClient {
 		return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap()));
 	}
 
+	@Override
+	public CompletableFuture<Void> cancel() {
+		return CompletableFuture.completedFuture(null);
+	}
+
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
index a36c6e1..af1025c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
@@ -40,4 +40,9 @@ public class TestingJobClient implements JobClient {
 		return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap()));
 	}
 
+	@Override
+	public CompletableFuture<Void> cancel() {
+		return CompletableFuture.completedFuture(null);
+	}
+
 }