You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/27 23:06:54 UTC
flink git commit: [FLINK-7650] [flip6] Port JobCancellationHandler to
new REST endpoint
Repository: flink
Updated Branches:
refs/heads/master 4debc6033 -> 8ea4db1a8
[FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint
Let the JobCancellationHandler implement the LegacyRestHandler interface. Moreover,
this commit adds the DELETE method to HttpMethodWrapper and the
RestServerEndpoint#registerHandler method.
Add PATCH method
This closes #4697.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ea4db1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ea4db1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ea4db1a
Branch: refs/heads/master
Commit: 8ea4db1a8b368b4e00dd310c0d07405fd2142b34
Parents: 4debc60
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 21 10:53:24 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 01:06:39 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/util/ExceptionUtils.java | 16 +++++
.../flink/runtime/dispatcher/Dispatcher.java | 12 ++++
.../runtime/dispatcher/DispatcherGateway.java | 17 ++++-
.../dispatcher/DispatcherRestEndpoint.java | 14 ++++
.../flink/runtime/jobmaster/JobMaster.java | 7 ++
.../runtime/jobmaster/JobMasterGateway.java | 39 +++++++----
.../messages/FlinkJobNotFoundException.java | 34 ++++++++++
.../flink/runtime/rest/HttpMethodWrapper.java | 4 +-
.../flink/runtime/rest/RestServerEndpoint.java | 8 +++
.../rest/handler/AbstractRestHandler.java | 15 +++--
.../rest/handler/RestHandlerException.java | 12 ++--
.../handler/legacy/JobCancellationHandler.java | 49 +++++++++++++-
.../rest/messages/EmptyResponseBody.java | 37 +++++++++++
.../rest/messages/JobCancellationHeaders.java | 70 ++++++++++++++++++++
.../rest/messages/JobIDPathParameter.java | 43 ++++++++++++
.../rest/messages/JobMessageParameters.java | 42 ++++++++++++
16 files changed, 394 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d141ecb..77d4643 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Optional;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -316,6 +317,21 @@ public final class ExceptionUtils {
}
/**
+ * Unpacks an {@link CompletionException} and returns its cause. Otherwise the given
+ * Throwable is returned.
+ *
+ * @param throwable to unpack if it is an CompletionException
+ * @return Cause of CompletionException or given Throwable
+ */
+ public static Throwable stripCompletionException(Throwable throwable) {
+ while (throwable instanceof CompletionException && throwable.getCause() != null) {
+ throwable = throwable.getCause();
+ }
+
+ return throwable;
+ }
+
+ /**
* Tries to find a {@link SerializedThrowable} as the cause of the given throwable and throws its
* deserialized value. If there is no such throwable, then the original throwable is thrown.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 8af3434..35a9b37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
@@ -242,6 +243,17 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
@Override
+ public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
+ JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+
+ if (jobManagerRunner == null) {
+ return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+ } else {
+ return jobManagerRunner.getJobManagerGateway().cancel(timeout);
+ }
+ }
+
+ @Override
public CompletableFuture<String> requestRestAddress(Time timeout) {
return restAddressFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 6aaf0b6..1f6d801 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -48,7 +48,7 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
@RpcTimeout Time timeout);
/**
- * Lists the current set of submitted jobs.
+ * List the current set of submitted jobs.
*
* @param timeout RPC timeout
* @return A future collection of currently submitted jobs
@@ -56,6 +56,21 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
CompletableFuture<Collection<JobID>> listJobs(
@RpcTimeout Time timeout);
+ /**
+ * Cancel the given job.
+ *
+ * @param jobId identifying the job to cancel
+ * @param timeout of the operation
+ * @return A future acknowledge if the cancellation succeeded
+ */
+ CompletableFuture<Acknowledge> cancelJob(JobID jobId, @RpcTimeout Time timeout);
+
+ /**
+ * Request the cluster overview.
+ *
+ * @param timeout of the operation
+ * @return Future {@link StatusOverview} containing the cluster information
+ */
CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout Time timeout);
CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 17f5616..d56eb17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
@@ -41,6 +42,9 @@ import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FileUtils;
@@ -114,6 +118,15 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
true,
true));
+ LegacyRestHandlerAdapter<DispatcherGateway, EmptyResponseBody, JobMessageParameters> jobCancellationHandler = new LegacyRestHandlerAdapter<>(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ JobCancellationHeaders.getInstance(),
+ new JobCancellationHandler(
+ executor,
+ timeout));
+
LegacyRestHandlerAdapter<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> clusterConfigurationHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
leaderRetriever,
@@ -142,6 +155,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
+ handlers.add(Tuple2.of(JobCancellationHeaders.getInstance(), jobCancellationHandler));
optWebContent.ifPresent(
webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 19fe4a6..a9e5cd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -357,6 +357,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
// RPC methods
//----------------------------------------------------------------------------------------------
+ @Override
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ executionGraph.cancel();
+
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
/**
* Updates the task execution state for a given task.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index c2fba47..946bb5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -55,6 +55,14 @@ import java.util.concurrent.CompletableFuture;
public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId> {
/**
+ * Cancels the currently executed job.
+ *
+ * @param timeout of this operation
+ * @return Future acknowledge of the operation
+ */
+ CompletableFuture<Acknowledge> cancel(@RpcTimeout Time timeout);
+
+ /**
* Updates the task execution state for a given task.
*
* @param taskExecutionState New task execution state for a given task
@@ -64,8 +72,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
final TaskExecutionState taskExecutionState);
/**
- * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
- * as a {@link SerializedInputSplit} message.
+ * Requests the next input split for the {@link ExecutionJobVertex}.
+ * The next input split is sent back to the sender as a
+ * {@link SerializedInputSplit} message.
*
* @param vertexID The job vertex id
* @param executionAttempt The execution attempt id
@@ -76,8 +85,8 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
final ExecutionAttemptID executionAttempt);
/**
- * Requests the current state of the partition.
- * The state of a partition is currently bound to the state of the producing execution.
+ * Requests the current state of the partition. The state of a
+ * partition is currently bound to the state of the producing execution.
*
* @param intermediateResultId The execution attempt ID of the task requesting the partition state.
* @param partitionId The partition ID of the partition to request the state of.
@@ -89,12 +98,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
/**
* Notifies the JobManager about available data for a produced partition.
- * <p>
- * There is a call to this method for each {@link ExecutionVertex} instance once per produced
+ *
+ * <p>There is a call to this method for each {@link ExecutionVertex} instance once per produced
* {@link ResultPartition} instance, either when first producing data (for pipelined executions)
* or when all data has been produced (for staged executions).
- * <p>
- * The JobManager then can decide when to schedule the partition consumers of the given session.
+ *
+ * <p>The JobManager then can decide when to schedule the partition consumers of the given session.
*
* @param partitionID The partition which has already produced data
* @param timeout before the rpc call fails
@@ -132,6 +141,8 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
CompletableFuture<KvStateLocation> lookupKvStateLocation(final String registrationName);
/**
+ * Notifies that queryable state has been registered.
+ *
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group range the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
@@ -146,6 +157,8 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
final KvStateServerAddress kvStateServerAddress);
/**
+ * Notifies that queryable state has been unregistered.
+ *
* @param jobVertexId JobVertexID the KvState instance belongs to.
* @param keyGroupRange Key group index the KvState instance belongs to.
* @param registrationName Name under which the KvState has been registered.
@@ -161,7 +174,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
CompletableFuture<ClassloadingProps> requestClassloadingProps();
/**
- * Offer the given slots to the job manager. The response contains the set of accepted slots.
+ * Offers the given slots to the job manager. The response contains the set of accepted slots.
*
* @param taskManagerId identifying the task manager
* @param slots to offer to the job manager
@@ -174,7 +187,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
@RpcTimeout final Time timeout);
/**
- * Fail the slot with the given allocation id and cause.
+ * Fails the slot with the given allocation id and cause.
*
* @param taskManagerId identifying the task manager
* @param allocationId identifying the slot to fail
@@ -185,7 +198,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
final Exception cause);
/**
- * Register the task manager at the job manager.
+ * Registers the task manager at the job manager.
*
* @param taskManagerRpcAddress the rpc address of the task manager
* @param taskManagerLocation location of the task manager
@@ -198,14 +211,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
@RpcTimeout final Time timeout);
/**
- * Send the heartbeat to job manager from task manager
+ * Sends the heartbeat to job manager from task manager
*
* @param resourceID unique id of the task manager
*/
void heartbeatFromTaskManager(final ResourceID resourceID);
/**
- * Heartbeat request from the resource manager
+ * Sends heartbeat request from the resource manager
*
* @param resourceID unique id of the resource manager
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
new file mode 100644
index 0000000..95686ac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is returned if a Flink job could not be found.
+ */
+public class FlinkJobNotFoundException extends FlinkException {
+
+ private static final long serialVersionUID = -7803390762010615384L;
+
+ public FlinkJobNotFoundException(JobID jobId) {
+ super("Could not find Flink job (" + jobId + ").");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
index 8987d75..fd1b22b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
@@ -25,7 +25,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
*/
public enum HttpMethodWrapper {
GET(HttpMethod.GET),
- POST(HttpMethod.POST);
+ POST(HttpMethod.POST),
+ DELETE(HttpMethod.DELETE),
+ PATCH(HttpMethod.PATCH);
private HttpMethod nettyHttpMethod;
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index ec6e5b0..d09aad9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -256,6 +256,14 @@ public abstract class RestServerEndpoint {
case POST:
router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
break;
+ case DELETE:
+ router.DELETE(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ break;
+ case PATCH:
+ router.PATCH(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+ break;
+ default:
+ throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 948ea07..ee24dce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
@@ -140,11 +141,17 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
response = FutureUtils.completedExceptionally(e);
}
- response.whenComplete((P resp, Throwable error) -> {
- if (error != null) {
+ response.whenComplete((P resp, Throwable throwable) -> {
+ if (throwable != null) {
+
+ Throwable error = ExceptionUtils.stripCompletionException(throwable);
+
if (error instanceof RestHandlerException) {
- RestHandlerException rhe = (RestHandlerException) error;
- HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus());
+ final RestHandlerException rhe = (RestHandlerException) error;
+
+ log.error("Exception occurred in REST handler.", error);
+
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
} else {
log.error("Implementation error: Unhandled exception.", error);
HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index 4cbb542..7ae8939 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -18,24 +18,26 @@
package org.apache.flink.runtime.rest.handler;
+import org.apache.flink.util.FlinkException;
+
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
* An exception that is thrown if the failure of a REST operation was detected by a handler.
*/
-public class RestHandlerException extends Exception {
+public class RestHandlerException extends FlinkException {
private static final long serialVersionUID = -1358206297964070876L;
- private final String errorMessage;
private final int responseCode;
public RestHandlerException(String errorMessage, HttpResponseStatus httpResponseStatus) {
- this.errorMessage = errorMessage;
+ super(errorMessage);
this.responseCode = httpResponseStatus.code();
}
- public String getErrorMessage() {
- return errorMessage;
+ public RestHandlerException(String errorMessage, HttpResponseStatus httpResponseStatus, Throwable cause) {
+ super(errorMessage, cause);
+ this.responseCode = httpResponseStatus.code();
}
public HttpResponseStatus getHttpResponseStatus() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
index 9e9849f..7b5bdc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
@@ -20,20 +20,34 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
/**
* Request handler for the CANCEL request.
*/
-public class JobCancellationHandler extends AbstractJsonRequestHandler {
+public class JobCancellationHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, EmptyResponseBody, JobMessageParameters> {
private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
@@ -70,4 +84,37 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
},
executor);
}
+
+ @Override
+ public CompletableFuture<EmptyResponseBody> handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, DispatcherGateway gateway) {
+ final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+
+ CompletableFuture<Acknowledge> cancelFuture = gateway.cancelJob(jobId, timeout);
+
+ return cancelFuture.handle(
+ (Acknowledge ack, Throwable throwable) -> {
+ if (throwable != null) {
+ Throwable error = ExceptionUtils.stripCompletionException(throwable);
+
+ if (error instanceof TimeoutException) {
+ throw new CompletionException(
+ new RestHandlerException(
+ "Job cancellation timed out.",
+ HttpResponseStatus.REQUEST_TIMEOUT, error));
+ } else if (error instanceof FlinkJobNotFoundException) {
+ throw new CompletionException(
+ new RestHandlerException(
+ "Job could not be found.",
+ HttpResponseStatus.NOT_FOUND, error));
+ } else {
+ throw new CompletionException(
+ new RestHandlerException(
+ "Job cancellation failed: " + error.getMessage(),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR, error));
+ }
+ } else {
+ return EmptyResponseBody.getInstance();
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java
new file mode 100644
index 0000000..8dc4787
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyResponseBody.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Empty {@link ResponseBody} implementation.
+ */
+public class EmptyResponseBody implements ResponseBody {
+
+ private static final EmptyResponseBody INSTANCE = new EmptyResponseBody();
+
+ private EmptyResponseBody() {}
+
+ private Object readResolve() {
+ return INSTANCE;
+ }
+
+ public static EmptyResponseBody getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
new file mode 100644
index 0000000..82f022b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobCancellationHandler}.
+ */
+public class JobCancellationHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, JobMessageParameters> {
+
+ public static final String URL = "/jobs/:jobid";
+
+ private static final JobCancellationHeaders INSTANCE = new JobCancellationHeaders();
+
+ private JobCancellationHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.ACCEPTED;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.PATCH;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static JobCancellationHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
new file mode 100644
index 0000000..a4ae0f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIDPathParameter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Path parameter identifying jobs.
+ */
+public class JobIDPathParameter extends MessagePathParameter<JobID> {
+
+ private static final String JOB_ID = "jobid";
+
+ public JobIDPathParameter() {
+ super(JOB_ID);
+ }
+
+ @Override
+ protected JobID convertFromString(String value) {
+ return JobID.fromHexString(value);
+ }
+
+ @Override
+ protected String convertToString(JobID value) {
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ea4db1a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
new file mode 100644
index 0000000..d77a29f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * <p>A job related REST handler always requires a {@link JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
+
+ private final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.singleton(jobPathParameter);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.emptyList();
+ }
+}