You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/27 23:06:54 UTC

flink git commit: [FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint

Repository: flink
Updated Branches:
  refs/heads/master 4debc6033 -> 8ea4db1a8


[FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint

Let the JobCancellationHandler implement the LegacyRestHandler interface. Moreover,
this commit adds the DELETE method to HttpMethodWrapper and the
RestServerEndpoint#registerHandler method.

Add PATCH method

This closes #4697.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ea4db1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ea4db1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ea4db1a

Branch: refs/heads/master
Commit: 8ea4db1a8b368b4e00dd310c0d07405fd2142b34
Parents: 4debc60
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 21 10:53:24 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 01:06:39 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   | 16 +++++
 .../flink/runtime/dispatcher/Dispatcher.java    | 12 ++++
 .../runtime/dispatcher/DispatcherGateway.java   | 17 ++++-
 .../dispatcher/DispatcherRestEndpoint.java      | 14 ++++
 .../flink/runtime/jobmaster/JobMaster.java      |  7 ++
 .../runtime/jobmaster/JobMasterGateway.java     | 39 +++++++----
 .../messages/FlinkJobNotFoundException.java     | 34 ++++++++++
 .../flink/runtime/rest/HttpMethodWrapper.java   |  4 +-
 .../flink/runtime/rest/RestServerEndpoint.java  |  8 +++
 .../rest/handler/AbstractRestHandler.java       | 15 +++--
 .../rest/handler/RestHandlerException.java      | 12 ++--
 .../handler/legacy/JobCancellationHandler.java  | 49 +++++++++++++-
 .../rest/messages/EmptyResponseBody.java        | 37 +++++++++++
 .../rest/messages/JobCancellationHeaders.java   | 70 ++++++++++++++++++++
 .../rest/messages/JobIDPathParameter.java       | 43 ++++++++++++
 .../rest/messages/JobMessageParameters.java     | 42 ++++++++++++
 16 files changed, 394 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d141ecb..77d4643 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Optional;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -316,6 +317,21 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Unpacks an {@link CompletionException} and returns its cause. Otherwise the given
+	 * Throwable is returned.
+	 *
+	 * @param throwable to unpack if it is an CompletionException
+	 * @return Cause of CompletionException or given Throwable
+	 */
+	public static Throwable stripCompletionException(Throwable throwable) {
+		while (throwable instanceof CompletionException && throwable.getCause() != null) {
+			throwable = throwable.getCause();
+		}
+
+		return throwable;
+	}
+
+	/**
 	 * Tries to find a {@link SerializedThrowable} as the cause of the given throwable and throws its
 	 * deserialized value. If there is no such throwable, then the original throwable is thrown.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 8af3434..35a9b37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
@@ -242,6 +243,17 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
+		JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+
+		if (jobManagerRunner == null) {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		} else {
+			return jobManagerRunner.getJobManagerGateway().cancel(timeout);
+		}
+	}
+
+	@Override
 	public CompletableFuture<String> requestRestAddress(Time timeout) {
 		return restAddressFuture;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 6aaf0b6..1f6d801 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -48,7 +48,7 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 		@RpcTimeout Time timeout);
 
 	/**
-	 * Lists the current set of submitted jobs.
+	 * List the current set of submitted jobs.
 	 *
 	 * @param timeout RPC timeout
 	 * @return A future collection of currently submitted jobs
@@ -56,6 +56,21 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 	CompletableFuture<Collection<JobID>> listJobs(
 		@RpcTimeout Time timeout);
 
+	/**
+	 * Cancel the given job.
+	 *
+	 * @param jobId identifying the job to cancel
+	 * @param timeout of the operation
+	 * @return A future acknowledge if the cancellation succeeded
+	 */
+	CompletableFuture<Acknowledge> cancelJob(JobID jobId, @RpcTimeout Time timeout);
+
+	/**
+	 * Request the cluster overview.
+	 *
+	 * @param timeout of the operation
+	 * @return Future {@link StatusOverview} containing the cluster information
+	 */
 	CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout Time timeout);
 
 	CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 17f5616..d56eb17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
@@ -41,6 +42,9 @@ import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FileUtils;
@@ -114,6 +118,15 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 				true,
 				true));
 
+		LegacyRestHandlerAdapter<DispatcherGateway, EmptyResponseBody, JobMessageParameters> jobCancellationHandler = new LegacyRestHandlerAdapter<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			JobCancellationHeaders.getInstance(),
+			new JobCancellationHandler(
+				executor,
+				timeout));
+
 		LegacyRestHandlerAdapter<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> clusterConfigurationHandler = new LegacyRestHandlerAdapter<>(
 			restAddressFuture,
 			leaderRetriever,
@@ -142,6 +155,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
 		handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
+		handlers.add(Tuple2.of(JobCancellationHeaders.getInstance(), jobCancellationHandler));
 
 		optWebContent.ifPresent(
 			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 19fe4a6..a9e5cd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -357,6 +357,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	// RPC methods
 	//----------------------------------------------------------------------------------------------
 
+	@Override
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		executionGraph.cancel();
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
 	/**
 	 * Updates the task execution state for a given task.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index c2fba47..946bb5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -55,6 +55,14 @@ import java.util.concurrent.CompletableFuture;
 public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId> {
 
 	/**
+	 * Cancels the currently executed job.
+	 *
+	 * @param timeout of this operation
+	 * @return Future acknowledge of the operation
+	 */
+	CompletableFuture<Acknowledge> cancel(@RpcTimeout Time timeout);
+
+	/**
 	 * Updates the task execution state for a given task.
 	 *
 	 * @param taskExecutionState New task execution state for a given task
@@ -64,8 +72,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 			final TaskExecutionState taskExecutionState);
 
 	/**
-	 * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
-	 * as a {@link SerializedInputSplit} message.
+	 * Requests the next input split for the {@link ExecutionJobVertex}.
+	 * The next input split is sent back to the sender as a
+	 * {@link SerializedInputSplit} message.
 	 *
 	 * @param vertexID         The job vertex id
 	 * @param executionAttempt The execution attempt id
@@ -76,8 +85,8 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 			final ExecutionAttemptID executionAttempt);
 
 	/**
-	 * Requests the current state of the partition.
-	 * The state of a partition is currently bound to the state of the producing execution.
+	 * Requests the current state of the partition. The state of a
+	 * partition is currently bound to the state of the producing execution.
 	 *
 	 * @param intermediateResultId The execution attempt ID of the task requesting the partition state.
 	 * @param partitionId          The partition ID of the partition to request the state of.
@@ -89,12 +98,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 
 	/**
 	 * Notifies the JobManager about available data for a produced partition.
-	 * <p>
-	 * There is a call to this method for each {@link ExecutionVertex} instance once per produced
+	 *
+	 * <p>There is a call to this method for each {@link ExecutionVertex} instance once per produced
 	 * {@link ResultPartition} instance, either when first producing data (for pipelined executions)
 	 * or when all data has been produced (for staged executions).
-	 * <p>
-	 * The JobManager then can decide when to schedule the partition consumers of the given session.
+	 *
+	 * <p>The JobManager then can decide when to schedule the partition consumers of the given session.
 	 *
 	 * @param partitionID     The partition which has already produced data
 	 * @param timeout         before the rpc call fails
@@ -132,6 +141,8 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	CompletableFuture<KvStateLocation> lookupKvStateLocation(final String registrationName);
 
 	/**
+	 * Notifies that queryable state has been registered.
+	 *
 	 * @param jobVertexId          JobVertexID the KvState instance belongs to.
 	 * @param keyGroupRange        Key group range the KvState instance belongs to.
 	 * @param registrationName     Name under which the KvState has been registered.
@@ -146,6 +157,8 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 			final KvStateServerAddress kvStateServerAddress);
 
 	/**
+	 * Notifies that queryable state has been unregistered.
+	 *
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to.
 	 * @param keyGroupRange    Key group index the KvState instance belongs to.
 	 * @param registrationName Name under which the KvState has been registered.
@@ -161,7 +174,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	CompletableFuture<ClassloadingProps> requestClassloadingProps();
 
 	/**
-	 * Offer the given slots to the job manager. The response contains the set of accepted slots.
+	 * Offers the given slots to the job manager. The response contains the set of accepted slots.
 	 *
 	 * @param taskManagerId identifying the task manager
 	 * @param slots         to offer to the job manager
@@ -174,7 +187,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 			@RpcTimeout final Time timeout);
 
 	/**
-	 * Fail the slot with the given allocation id and cause.
+	 * Fails the slot with the given allocation id and cause.
 	 *
 	 * @param taskManagerId identifying the task manager
 	 * @param allocationId  identifying the slot to fail
@@ -185,7 +198,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 			final Exception cause);
 
 	/**
-	 * Register the task manager at the job manager.
+	 * Registers the task manager at the job manager.
 	 *
 	 * @param taskManagerRpcAddress the rpc address of the task manager
 	 * @param taskManagerLocation   location of the task manager
@@ -198,14 +211,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 			@RpcTimeout final Time timeout);
 
 	/**
-	 * Send the heartbeat to job manager from task manager
+	 * Sends the heartbeat to job manager from task manager
 	 *
 	 * @param resourceID unique id of the task manager
 	 */
 	void heartbeatFromTaskManager(final ResourceID resourceID);
 
 	/**
-	 * Heartbeat request from the resource manager
+	 * Sends heartbeat request from the resource manager
 	 *
 	 * @param resourceID unique id of the resource manager
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
new file mode 100644
index 0000000..95686ac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is returned if a Flink job could not be found.
+ */
+public class FlinkJobNotFoundException extends FlinkException {
+
+	private static final long serialVersionUID = -7803390762010615384L;
+
+	public FlinkJobNotFoundException(JobID jobId) {
+		super("Could not find Flink job (" + jobId + ").");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
index 8987d75..fd1b22b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
@@ -25,7 +25,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
  */
 public enum HttpMethodWrapper {
 	GET(HttpMethod.GET),
-	POST(HttpMethod.POST);
+	POST(HttpMethod.POST),
+	DELETE(HttpMethod.DELETE),
+	PATCH(HttpMethod.PATCH);
 
 	private HttpMethod nettyHttpMethod;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index ec6e5b0..d09aad9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -256,6 +256,14 @@ public abstract class RestServerEndpoint {
 			case POST:
 				router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
 				break;
+			case DELETE:
+				router.DELETE(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				break;
+			case PATCH:
+				router.PATCH(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				break;
+			default:
+				throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 948ea07..ee24dce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
@@ -140,11 +141,17 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 				response = FutureUtils.completedExceptionally(e);
 			}
 
-			response.whenComplete((P resp, Throwable error) -> {
-				if (error != null) {
+			response.whenComplete((P resp, Throwable throwable) -> {
+				if (throwable != null) {
+
+					Throwable error = ExceptionUtils.stripCompletionException(throwable);
+
 					if (error instanceof RestHandlerException) {
-						RestHandlerException rhe = (RestHandlerException) error;
-						HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus());
+						final RestHandlerException rhe = (RestHandlerException) error;
+
+						log.error("Exception occurred in REST handler.", error);
+
+						HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
 					} else {
 						log.error("Implementation error: Unhandled exception.", error);
 						HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index 4cbb542..7ae8939 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -18,24 +18,26 @@
 
 package org.apache.flink.runtime.rest.handler;
 
+import org.apache.flink.util.FlinkException;
+
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /**
  * An exception that is thrown if the failure of a REST operation was detected by a handler.
  */
-public class RestHandlerException extends Exception {
+public class RestHandlerException extends FlinkException {
 	private static final long serialVersionUID = -1358206297964070876L;
 
-	private final String errorMessage;
 	private final int responseCode;
 
 	public RestHandlerException(String errorMessage, HttpResponseStatus httpResponseStatus) {
-		this.errorMessage = errorMessage;
+		super(errorMessage);
 		this.responseCode = httpResponseStatus.code();
 	}
 
-	public String getErrorMessage() {
-		return errorMessage;
+	public RestHandlerException(String errorMessage, HttpResponseStatus httpResponseStatus, Throwable cause) {
+		super(errorMessage, cause);
+		this.responseCode = httpResponseStatus.code();
 	}
 
 	public HttpResponseStatus getHttpResponseStatus() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
index 9e9849f..7b5bdc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
@@ -20,20 +20,34 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Request handler for the CANCEL request.
  */
-public class JobCancellationHandler extends AbstractJsonRequestHandler {
+public class JobCancellationHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, EmptyResponseBody, JobMessageParameters> {
 
 	private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
 	private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
@@ -70,4 +84,37 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 			},
 			executor);
 	}
+
+	@Override
+	public CompletableFuture<EmptyResponseBody> handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, DispatcherGateway gateway) {
+		final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+
+		CompletableFuture<Acknowledge> cancelFuture = gateway.cancelJob(jobId, timeout);
+
+		return cancelFuture.handle(
+			(Acknowledge ack, Throwable throwable) -> {
+				if (throwable != null) {
+					Throwable error = ExceptionUtils.stripCompletionException(throwable);
+
+					if (error instanceof TimeoutException) {
+						throw new CompletionException(
+							new RestHandlerException(
+								"Job cancellation timed out.",
+								HttpResponseStatus.REQUEST_TIMEOUT, error));
+					} else if (error instanceof FlinkJobNotFoundException) {
+						throw new CompletionException(
+							new RestHandlerException(
+								"Job could not be found.",
+								HttpResponseStatus.NOT_FOUND, error));
+					} else {
+						throw new CompletionException(
+							new RestHandlerException(
+								"Job cancellation failed: " + error.getMessage(),
+								HttpResponseStatus.INTERNAL_SERVER_ERROR, error));
+					}
+				} else {
+					return EmptyResponseBody.getInstance();
+				}
+			});
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java
new file mode 100644
index 0000000..8dc4787
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Empty {@link ResponseBody} implementation.
+ */
+public class EmptyResponseBody implements ResponseBody {
+
+	private static final EmptyResponseBody INSTANCE = new EmptyResponseBody();
+
+	private EmptyResponseBody() {}
+
+	private Object readResolve() {
+		return INSTANCE;
+	}
+
+	public static EmptyResponseBody getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
new file mode 100644
index 0000000..82f022b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobCancellationHandler}.
+ */
+public class JobCancellationHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, JobMessageParameters> {
+
+	public static final String URL = "/jobs/:jobid";
+
+	private static final JobCancellationHeaders INSTANCE = new JobCancellationHeaders();
+
+	private JobCancellationHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<EmptyResponseBody> getResponseClass() {
+		return EmptyResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.ACCEPTED;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.PATCH;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobCancellationHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
new file mode 100644
index 0000000..a4ae0f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Path parameter identifying jobs.
+ */
+public class JobIDPathParameter extends MessagePathParameter<JobID> {
+
+	private static final String JOB_ID = "jobid";
+
+	public JobIDPathParameter() {
+		super(JOB_ID);
+	}
+
+	@Override
+	protected JobID convertFromString(String value) {
+		return JobID.fromHexString(value);
+	}
+
+	@Override
+	protected String convertToString(JobID value) {
+		return value.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
new file mode 100644
index 0000000..d77a29f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * <p>A job related REST handler always requires a {@link JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
+
+	private final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.singleton(jobPathParameter);
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.emptyList();
+	}
+}