You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/03 19:16:06 UTC
[3/3] flink git commit: [FLINK-9289][rest] Rework JobSubmitHandler to
accept jar files
[FLINK-9289][rest] Rework JobSubmitHandler to accept jar files
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/797709cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/797709cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/797709cb
Branch: refs/heads/release-1.5
Commit: 797709cb2466610b1d5b05c12e43d3f7d4f70183
Parents: 06b9bf1
Author: zentol <ch...@apache.org>
Authored: Mon Jun 11 11:45:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 3 18:32:08 2018 +0200
----------------------------------------------------------------------
docs/_includes/generated/rest_dispatcher.html | 234 ++++++++-----------
.../client/program/rest/RestClusterClient.java | 120 +++++-----
.../program/rest/RestClusterClientTest.java | 23 --
.../webmonitor/handlers/JarUploadHandler.java | 5 +-
.../handlers/JarUploadHandlerTest.java | 2 +-
.../dispatcher/DispatcherRestEndpoint.java | 11 +-
.../flink/runtime/rest/AbstractHandler.java | 128 +++++-----
.../rest/handler/AbstractRestHandler.java | 6 +-
.../flink/runtime/rest/handler/FileUploads.java | 9 +-
.../runtime/rest/handler/HandlerRequest.java | 8 +-
.../rest/handler/job/BlobServerPortHandler.java | 66 ------
.../rest/handler/job/JobSubmitHandler.java | 130 +++++++++--
.../AbstractTaskManagerFileHandler.java | 4 +-
.../rest/messages/BlobServerPortHeaders.java | 74 ------
.../messages/BlobServerPortResponseBody.java | 57 -----
.../rest/messages/job/JobSubmitHeaders.java | 11 +-
.../rest/messages/job/JobSubmitRequestBody.java | 66 +++---
.../flink/runtime/rest/util/RestConstants.java | 6 +-
.../flink/runtime/rest/AbstractHandlerTest.java | 19 +-
.../runtime/rest/MultipartUploadResource.java | 7 +-
.../runtime/rest/RestServerEndpointITCase.java | 3 +-
.../runtime/rest/handler/FileUploadsTest.java | 6 +-
.../handler/job/BlobServerPortHandlerTest.java | 101 --------
.../rest/handler/job/JobSubmitHandlerTest.java | 181 +++++++++++---
.../messages/BlobServerPortResponseTest.java | 35 ---
.../rest/messages/JobSubmitRequestBodyTest.java | 6 +-
.../webmonitor/TestingDispatcherGateway.java | 203 ++++++++++++++++
.../webmonitor/TestingRestfulGateway.java | 30 +--
28 files changed, 810 insertions(+), 741 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index c74da9c..034a3d3 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -1,50 +1,6 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/blobserver/port</strong></td>
- </tr>
- <tr>
- <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
- <td class="text-left">Response code: <code>200 OK</code></td>
- </tr>
- <tr>
- <td colspan="2">Returns the port of blob server which can be used to upload jars.</td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#607508253">Request</button>
- <div id="607508253" class="collapse">
- <pre>
- <code>
-{} </code>
- </pre>
- </div>
- </td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#1913718109">Response</button>
- <div id="1913718109" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
- "properties" : {
- "port" : {
- "type" : "integer"
- }
- }
-} </code>
- </pre>
- </div>
- </td>
- </tr>
- </tbody>
-</table>
-<table class="table table-bordered">
- <tbody>
- <tr>
<td class="text-left" colspan="2"><strong>/cluster</strong></td>
</tr>
<tr>
@@ -226,19 +182,11 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#919877128">Request</button>
- <div id="919877128" class="collapse">
+ <button data-toggle="collapse" data-target="#-1290030289">Request</button>
+ <div id="-1290030289" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:FileUpload",
- "properties" : {
- "path" : {
- "type" : "string"
- }
- }
-} </code>
+{} </code>
</pre>
</div>
</td>
@@ -607,7 +555,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
<td class="text-left">Response code: <code>202 Accepted</code></td>
</tr>
<tr>
- <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client.</td>
+ <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client. This call expects amultipart/form-data request that consists of file uploads for the serialized JobGraph, jars anddistributed cache artifacts and an attribute named "request"for the JSON payload.</td>
</tr>
<tr>
<td colspan="2">
@@ -619,10 +567,13 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
"properties" : {
- "serializedJobGraph" : {
+ "jobGraphFileName" : {
+ "type" : "string"
+ },
+ "jobJarFileNames" : {
"type" : "array",
"items" : {
- "type" : "integer"
+ "type" : "string"
}
}
}
@@ -2461,79 +2412,6 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
- </tr>
- <tr>
- <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
- <td class="text-left">Response code: <code>200 OK</code></td>
- </tr>
- <tr>
- <td colspan="2">Returns user-defined accumulators of a task, aggregated across all subtasks.</td>
- </tr>
- <tr>
- <td colspan="2">Path parameters</td>
- </tr>
- <tr>
- <td colspan="2">
- <ul>
-<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
- </ul>
- </td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#485581006">Request</button>
- <div id="485581006" class="collapse">
- <pre>
- <code>
-{} </code>
- </pre>
- </div>
- </td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#-1070353054">Response</button>
- <div id="-1070353054" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexAccumulatorsInfo",
- "properties" : {
- "id" : {
- "type" : "string"
- },
- "user-accumulators" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
- "properties" : {
- "name" : {
- "type" : "string"
- },
- "type" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
- }
- }
-} </code>
- </pre>
- </div>
- </td>
- </tr>
- </tbody>
-</table>
-<table class="table table-bordered">
- <tbody>
- <tr>
<td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td>
</tr>
<tr>
@@ -2675,6 +2553,100 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
<table class="table table-bordered">
<tbody>
<tr>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/accumulators</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Returns all user-defined accumulators for all subtasks of a task.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#-886388859">Request</button>
+ <div id="-886388859" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#112317594">Response</button>
+ <div id="112317594" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo",
+ "properties" : {
+ "id" : {
+ "type" : "any"
+ },
+ "parallelism" : {
+ "type" : "integer"
+ },
+ "subtasks" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo:SubtaskAccumulatorsInfo",
+ "properties" : {
+ "subtask" : {
+ "type" : "integer"
+ },
+ "attempt" : {
+ "type" : "integer"
+ },
+ "host" : {
+ "type" : "string"
+ },
+ "user-accumulators" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
+ "properties" : {
+ "name" : {
+ "type" : "string"
+ },
+ "type" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
<td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 67233ea..adfd1df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -23,14 +23,14 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.NewClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -49,8 +50,6 @@ import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeader
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -87,10 +86,10 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq
import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.ScalaUtils;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
@@ -102,20 +101,20 @@ import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import akka.actor.AddressFromURIString;
-
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.ObjectOutputStream;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -313,41 +312,55 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
// we have to enable queued scheduling because slot will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
- log.info("Requesting blob server port.");
- CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
-
- CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
- getDispatcherAddress(),
- (BlobServerPortResponseBody response, String dispatcherAddress) -> {
- log.info("Uploading jar files.");
- final int blobServerPort = response.port;
- final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
- final List<PermanentBlobKey> keys;
- try {
- keys = BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
- } catch (IOException ioe) {
- throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
+ try {
+ final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(jobGraph);
}
+ return jobGraphFile;
+ } catch (IOException e) {
+ throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
+ }
+ }, executorService);
- for (PermanentBlobKey key : keys) {
- jobGraph.addBlob(key);
- }
+ CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
+ List<String> jarFileNames = new ArrayList<>(8);
+ Collection<FileUpload> filesToUpload = new ArrayList<>(8);
- return jobGraph;
- });
+ filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
- CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose(
- (JobGraph jobGraphToSubmit) -> {
- log.info("Submitting job graph.");
+ for (Path jar : jobGraph.getUserJars()) {
+ jarFileNames.add(jar.getName());
+ filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+ }
- try {
- return sendRequest(
- JobSubmitHeaders.getInstance(),
- new JobSubmitRequestBody(jobGraph));
- } catch (IOException ioe) {
- throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe));
- }
- });
+ final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
+ jobGraphFile.getFileName().toString(),
+ jarFileNames
+ );
+
+ return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
+ });
+
+ final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
+ requestAndFileUploads -> sendRetriableRequest(
+ JobSubmitHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ requestAndFileUploads.f0,
+ requestAndFileUploads.f1,
+ isConnectionProblemOrServiceUnavailable())
+ );
+
+ submissionFuture
+ .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
+ .thenAccept(jobGraphFile -> {
+ try {
+ Files.delete(jobGraphFile);
+ } catch (IOException e) {
+ log.warn("Could not delete temporary file {}.", jobGraphFile, e);
+ }
+ });
return submissionFuture
.thenApply(
@@ -679,9 +692,14 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) {
+ return sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate);
+ }
+
+ private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+ sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {
return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
try {
- return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+ return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload);
} catch (IOException e) {
throw new CompletionException(e);
}
@@ -739,26 +757,4 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
}
}, executorService);
}
-
- private CompletableFuture<String> getDispatcherAddress() {
- return FutureUtils.orTimeout(
- dispatcherLeaderRetriever.getLeaderFuture(),
- restClusterClientConfiguration.getAwaitLeaderTimeout(),
- TimeUnit.MILLISECONDS)
- .thenApplyAsync(leaderAddressSessionId -> {
- final String address = leaderAddressSessionId.f0;
- final Optional<String> host = ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host());
-
- return host.orElseGet(() -> {
- // if the dispatcher address does not contain a host part, then assume it's running
- // on the same machine as the client
- log.info("The dispatcher seems to run without remoting enabled. This indicates that we are " +
- "in a test. This can only work if the RestClusterClient runs on the same machine. " +
- "Assuming, therefore, 'localhost' as the host.");
-
- return "localhost";
- });
- }, executorService);
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f025d67..75f16c0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -53,8 +53,6 @@ import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -226,7 +224,6 @@ public class RestClusterClientTest extends TestLogger {
@Test
public void testJobSubmitCancelStop() throws Exception {
- TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler();
TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
TestJobExecutionResultHandler testJobExecutionResultHandler =
@@ -237,15 +234,12 @@ public class RestClusterClientTest extends TestLogger {
.build()));
try (TestRestServerEndpoint ignored = createRestServerEndpoint(
- portHandler,
submitHandler,
terminationHandler,
testJobExecutionResultHandler)) {
- Assert.assertFalse(portHandler.portRetrieved);
Assert.assertFalse(submitHandler.jobSubmitted);
restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
- Assert.assertTrue(portHandler.portRetrieved);
Assert.assertTrue(submitHandler.jobSubmitted);
Assert.assertFalse(terminationHandler.jobCanceled);
@@ -264,11 +258,9 @@ public class RestClusterClientTest extends TestLogger {
@Test
public void testDetachedJobSubmission() throws Exception {
- final TestBlobServerPortHandler testBlobServerPortHandler = new TestBlobServerPortHandler();
final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
try (TestRestServerEndpoint ignored = createRestServerEndpoint(
- testBlobServerPortHandler,
testJobSubmitHandler)) {
restClusterClient.setDetached(true);
@@ -282,20 +274,6 @@ public class RestClusterClientTest extends TestLogger {
}
- private class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
- private volatile boolean portRetrieved = false;
-
- private TestBlobServerPortHandler() {
- super(BlobServerPortHeaders.getInstance());
- }
-
- @Override
- protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- portRetrieved = true;
- return CompletableFuture.completedFuture(new BlobServerPortResponseBody(12000));
- }
- }
-
private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
private volatile boolean jobSubmitted = false;
@@ -390,7 +368,6 @@ public class RestClusterClientTest extends TestLogger {
try (TestRestServerEndpoint ignored = createRestServerEndpoint(
testJobExecutionResultHandler,
- new TestBlobServerPortHandler(),
new TestJobSubmitHandler())) {
JobExecutionResult jobExecutionResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index a1ef82b..83db224 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -32,6 +32,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import javax.annotation.Nonnull;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -70,11 +71,11 @@ public class JarUploadHandler extends
protected CompletableFuture<JarUploadResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
@Nonnull final RestfulGateway gateway) throws RestHandlerException {
- Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Collection<File> uploadedFiles = request.getUploadedFiles();
if (uploadedFiles.size() != 1) {
throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
}
- final Path fileUpload = uploadedFiles.iterator().next();
+ final Path fileUpload = uploadedFiles.iterator().next().toPath();
return CompletableFuture.supplyAsync(() -> {
if (!fileUpload.getFileName().toString().endsWith(".jar")) {
throw new CompletionException(new RestHandlerException(
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index 812d4c6..c9e25ed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -131,6 +131,6 @@ public class JarUploadHandlerTest extends TestLogger {
EmptyMessageParameters.getInstance(),
Collections.emptyMap(),
Collections.emptyMap(),
- Collections.singleton(uploadedFile));
+ Collections.singleton(uploadedFile.toFile()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8072cf4..4279330 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -88,17 +87,12 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
final Time timeout = restConfiguration.getTimeout();
- BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(
- restAddressFuture,
- leaderRetriever,
- timeout,
- responseHeaders);
-
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
restAddressFuture,
leaderRetriever,
timeout,
- responseHeaders);
+ responseHeaders,
+ executor);
if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
try {
@@ -125,7 +119,6 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
log.info("Web-based job submission is not enabled.");
}
- handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 41d242b..e8d9384 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -49,7 +49,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -89,87 +91,73 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
final HttpRequest httpRequest = routed.request();
+ FileUploads uploadedFiles = null;
try {
if (!(httpRequest instanceof FullHttpRequest)) {
// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
// FullHttpRequests.
log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody("Bad request received."),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
+ throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST);
}
final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
- try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) {
+ uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx);
- if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody("File uploads not allowed."),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
- }
+ if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
+ throw new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST);
+ }
- R request;
- if (msgContent.capacity() == 0) {
- try {
- request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
- } catch (JsonParseException | JsonMappingException je) {
- log.error("Request did not conform to expected format.", je);
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody("Bad request received."),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
- }
- } else {
- try {
- ByteBufInputStream in = new ByteBufInputStream(msgContent);
- request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
- } catch (JsonParseException | JsonMappingException je) {
- log.error("Failed to read request.", je);
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
- }
+ R request;
+ if (msgContent.capacity() == 0) {
+ try {
+ request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+ } catch (JsonParseException | JsonMappingException je) {
+ log.error("Request did not conform to expected format.", je);
+ throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
}
-
- final HandlerRequest<R, M> handlerRequest;
-
+ } else {
try {
- handlerRequest = new HandlerRequest<>(request, untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams(), uploadedFiles.getUploadedFiles());
- } catch (HandlerRequestException hre) {
- log.error("Could not create the handler request.", hre);
-
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
+ ByteBufInputStream in = new ByteBufInputStream(msgContent);
+ request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
+ } catch (JsonParseException | JsonMappingException je) {
+ log.error("Failed to read request.", je);
+ throw new RestHandlerException(
+ String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
+ je);
}
+ }
+
+ final HandlerRequest<R, M> handlerRequest;
- respondToRequest(
- ctx,
- httpRequest,
- handlerRequest,
- gateway);
+ try {
+ handlerRequest = new HandlerRequest<>(request, untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams(), uploadedFiles.getUploadedFiles());
+ } catch (HandlerRequestException hre) {
+ log.error("Could not create the handler request.", hre);
+ throw new RestHandlerException(
+ String.format("Bad request, could not parse parameters: %s", hre.getMessage()),
+ HttpResponseStatus.BAD_REQUEST,
+ hre);
}
+ CompletableFuture<Void> requestProcessingFuture = respondToRequest(
+ ctx,
+ httpRequest,
+ handlerRequest,
+ gateway);
+
+ final FileUploads finalUploadedFiles = uploadedFiles;
+ requestProcessingFuture
+ .whenComplete((Void ignored, Throwable throwable) -> cleanupFileUploads(finalUploadedFiles));
+ } catch (RestHandlerException rhe) {
+ HandlerUtils.sendErrorResponse(
+ ctx,
+ httpRequest,
+ new ErrorResponseBody(rhe.getMessage()),
+ rhe.getHttpResponseStatus(),
+ responseHeaders);
+ cleanupFileUploads(uploadedFiles);
} catch (Throwable e) {
log.error("Request processing failed.", e);
HandlerUtils.sendErrorResponse(
@@ -178,6 +166,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
new ErrorResponseBody("Internal server error."),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
+ cleanupFileUploads(uploadedFiles);
+ }
+ }
+
+ private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
+ if (uploadedFiles != null) {
+ try {
+ uploadedFiles.close();
+ } catch (IOException e) {
+ log.warn("Could not cleanup uploaded files.", e);
+ }
}
}
@@ -188,9 +187,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
* @param httpRequest original http request
* @param handlerRequest typed handler request
* @param gateway leader gateway
+ * @return Future which is completed once the request has been processed
* @throws RestHandlerException if an exception occurred while responding
*/
- protected abstract void respondToRequest(
+ protected abstract CompletableFuture<Void> respondToRequest(
ChannelHandlerContext ctx,
HttpRequest httpRequest,
HandlerRequest<R, M> handlerRequest,
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 448711b..e4cec08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -70,7 +70,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
}
@Override
- protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
+ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
CompletableFuture<P> response;
try {
@@ -79,7 +79,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
response = FutureUtils.completedExceptionally(e);
}
- response.whenComplete((P resp, Throwable throwable) -> {
+ return response.whenComplete((P resp, Throwable throwable) -> {
if (throwable != null) {
Throwable error = ExceptionUtils.stripCompletionException(throwable);
@@ -105,7 +105,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
messageHeaders.getResponseStatusCode(),
responseHeaders);
}
- });
+ }).thenApply(ignored -> null);
}
private void processRestHandlerException(ChannelHandlerContext ctx, HttpRequest httpRequest, RestHandlerException rhe) {
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
index 31ac47bb..b233cb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
@@ -24,6 +24,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
@@ -58,7 +59,7 @@ public final class FileUploads implements AutoCloseable {
this.uploadDirectory = uploadDirectory;
}
- public Collection<Path> getUploadedFiles() throws IOException {
+ public Collection<File> getUploadedFiles() throws IOException {
if (uploadDirectory == null) {
return Collections.emptyList();
}
@@ -78,9 +79,9 @@ public final class FileUploads implements AutoCloseable {
private static final class FileAdderVisitor extends SimpleFileVisitor<Path> {
- private final Collection<Path> files = new ArrayList<>(4);
+ private final Collection<File> files = new ArrayList<>(4);
- Collection<Path> getContainedFiles() {
+ Collection<File> getContainedFiles() {
return files;
}
@@ -90,7 +91,7 @@ public final class FileUploads implements AutoCloseable {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
FileVisitResult result = super.visitFile(file, attrs);
- files.add(file);
+ files.add(file.toFile());
return result;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 7e93556..990dae5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
-import java.nio.file.Path;
+import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -43,7 +43,7 @@ import java.util.StringJoiner;
public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
private final R requestBody;
- private final Collection<Path> uploadedFiles;
+ private final Collection<File> uploadedFiles;
private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
@@ -55,7 +55,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
this(requestBody, messageParameters, receivedPathParameters, receivedQueryParameters, Collections.emptyList());
}
- public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<Path> uploadedFiles) throws HandlerRequestException {
+ public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<File> uploadedFiles) throws HandlerRequestException {
this.requestBody = Preconditions.checkNotNull(requestBody);
this.uploadedFiles = Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles));
Preconditions.checkNotNull(messageParameters);
@@ -141,7 +141,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
}
@Nonnull
- public Collection<Path> getUploadedFiles() {
+ public Collection<File> getUploadedFiles() {
return uploadedFiles;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
deleted file mode 100644
index 4b5fa89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-
-/**
- * This handler can be used to retrieve the port that the blob server runs on.
- */
-public final class BlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
- public BlobServerPortHandler(
- CompletableFuture<String> localRestAddress,
- GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
- Time timeout,
- Map<String, String> headers) {
- super(localRestAddress, leaderRetriever, timeout, headers, BlobServerPortHeaders.getInstance());
- }
-
- @Override
- protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- return gateway
- .getBlobServerPort(timeout)
- .thenApply(BlobServerPortResponseBody::new)
- .exceptionally(error -> {
- throw new CompletionException(new RestHandlerException(
- "Failed to retrieve blob server port.",
- HttpResponseStatus.INTERNAL_SERVER_ERROR,
- ExceptionUtils.stripCompletionException(error)));
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index af04629..dfa9591 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -19,8 +19,13 @@
package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -34,42 +39,139 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import javax.annotation.Nonnull;
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
/**
* This handler can be used to submit jobs to a Flink cluster.
*/
public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
+ private static final String FILE_TYPE_JOB_GRAPH = "JobGraph";
+ private static final String FILE_TYPE_JAR = "Jar";
+
+ private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Time timeout,
- Map<String, String> headers) {
+ Map<String, String> headers,
+ Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance());
+ this.executor = executor;
}
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- JobGraph jobGraph;
- try {
- ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
- jobGraph = (JobGraph) objectIn.readObject();
- } catch (Exception e) {
+ final Collection<File> uploadedFiles = request.getUploadedFiles();
+ final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
+ File::getName,
+ Path::fromLocalFile
+ ));
+
+ if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
- "Failed to deserialize JobGraph.",
- HttpResponseStatus.BAD_REQUEST,
- e);
+ String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
+ uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
+ nameToFile.size(),
+ uploadedFiles.size()),
+ HttpResponseStatus.BAD_REQUEST
+ );
}
- return gateway.submitJob(jobGraph, timeout)
- .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
+ final JobSubmitRequestBody requestBody = request.getRequestBody();
+
+ CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
+
+ Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
+
+ CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles);
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
+
+ return jobSubmissionFuture.thenCombine(jobGraphFuture,
+ (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
.exceptionally(exception -> {
- throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
- });
+ throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
+ });
+ }
+
+ private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody requestBody, Map<String, Path> nameToFile) throws MissingFileException {
+ final Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, nameToFile);
+
+ return CompletableFuture.supplyAsync(() -> {
+ JobGraph jobGraph;
+ try (ObjectInputStream objectIn = new ObjectInputStream(jobGraphFile.getFileSystem().open(jobGraphFile))) {
+ jobGraph = (JobGraph) objectIn.readObject();
+ } catch (Exception e) {
+ throw new CompletionException(new RestHandlerException(
+ "Failed to deserialize JobGraph.",
+ HttpResponseStatus.BAD_REQUEST,
+ e));
+ }
+ return jobGraph;
+ }, executor);
+ }
+
+ private static Collection<Path> getJarFilesToUpload(Collection<String> jarFileNames, Map<String, Path> nameToFileMap) throws MissingFileException {
+ Collection<Path> jarFiles = new ArrayList<>(jarFileNames.size());
+ for (String jarFileName : jarFileNames) {
+ Path jarFile = getPathAndAssertUpload(jarFileName, FILE_TYPE_JAR, nameToFileMap);
+ jarFiles.add(new Path(jarFile.toString()));
+ }
+ return jarFiles;
+ }
+
+ private CompletableFuture<JobGraph> uploadJobGraphFiles(
+ DispatcherGateway gateway,
+ CompletableFuture<JobGraph> jobGraphFuture,
+ Collection<Path> jarFiles) {
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
+ if (!jarFiles.isEmpty()) {
+ try {
+ final List<PermanentBlobKey> permanentBlobKeys = BlobClient.uploadJarFiles(address, new Configuration(), jobGraph.getJobID(), new ArrayList<>(jarFiles));
+ for (PermanentBlobKey blobKey : permanentBlobKeys) {
+ jobGraph.addBlob(blobKey);
+ }
+ } catch (IOException e) {
+ throw new CompletionException(new RestHandlerException(
+ "Could not upload job files.",
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ e));
+ }
+ }
+ return jobGraph;
+ });
+ }
+
+ private static Path getPathAndAssertUpload(String fileName, String type, Map<String, Path> uploadedFiles) throws MissingFileException {
+ final Path file = uploadedFiles.get(fileName);
+ if (file == null) {
+ throw new MissingFileException(type, fileName);
+ }
+ return file;
+ }
+
+ private static final class MissingFileException extends RestHandlerException {
+
+ private static final long serialVersionUID = -7954810495610194965L;
+
+ MissingFileException(String type, String fileName) {
+ super(type + " file " + fileName + " could not be found on the server.", HttpResponseStatus.BAD_REQUEST);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 5b5d97d..265813f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -117,7 +117,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
}
@Override
- protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
final ResourceID taskManagerId = handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
final CompletableFuture<TransientBlobKey> blobKeyFuture;
@@ -152,7 +152,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
},
ctx.executor());
- resultFuture.whenComplete(
+ return resultFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
log.debug("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable);
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
deleted file mode 100644
index a845de3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * These headers define the protocol for querying the port of the blob server.
- */
-public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
- private static final String URL = "/blobserver/port";
- private static final BlobServerPortHeaders INSTANCE = new BlobServerPortHeaders();
-
- private BlobServerPortHeaders() {
- }
-
- @Override
- public Class<EmptyRequestBody> getRequestClass() {
- return EmptyRequestBody.class;
- }
-
- @Override
- public HttpMethodWrapper getHttpMethod() {
- return HttpMethodWrapper.GET;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return URL;
- }
-
- @Override
- public Class<BlobServerPortResponseBody> getResponseClass() {
- return BlobServerPortResponseBody.class;
- }
-
- @Override
- public HttpResponseStatus getResponseStatusCode() {
- return HttpResponseStatus.OK;
- }
-
- @Override
- public EmptyMessageParameters getUnresolvedMessageParameters() {
- return EmptyMessageParameters.getInstance();
- }
-
- public static BlobServerPortHeaders getInstance() {
- return INSTANCE;
- }
-
- @Override
- public String getDescription() {
- return "Returns the port of blob server which can be used to upload jars.";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
deleted file mode 100644
index 895ecf3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Response containing the blob server port.
- */
-public final class BlobServerPortResponseBody implements ResponseBody {
-
- static final String FIELD_NAME_PORT = "port";
-
- /**
- * The port of the blob server.
- */
- @JsonProperty(FIELD_NAME_PORT)
- public final int port;
-
- @JsonCreator
- public BlobServerPortResponseBody(
- @JsonProperty(FIELD_NAME_PORT) int port) {
-
- this.port = port;
- }
-
- @Override
- public int hashCode() {
- return 67 * port;
- }
-
- @Override
- public boolean equals(Object object) {
- if (object instanceof BlobServerPortResponseBody) {
- BlobServerPortResponseBody other = (BlobServerPortResponseBody) object;
- return this.port == other.port;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
index 88f53f2..42f64b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.messages.job;
+import org.apache.flink.runtime.rest.FileUploadHandler;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -71,6 +72,14 @@ public class JobSubmitHeaders implements MessageHeaders<JobSubmitRequestBody, Jo
@Override
public String getDescription() {
- return "Submits a job. This call is primarily intended to be used by the Flink client.";
+ return "Submits a job. This call is primarily intended to be used by the Flink client. This call expects a" +
+ "multipart/form-data request that consists of file uploads for the serialized JobGraph, jars and" +
+ "distributed cache artifacts and an attribute named \"" + FileUploadHandler.HTTP_ATTRIBUTE_REQUEST + "\"for " +
+ "the JSON payload.";
+ }
+
+ @Override
+ public boolean acceptsFileUploads() {
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
index 3f550f0..8829bc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
@@ -18,64 +18,62 @@
package org.apache.flink.runtime.rest.messages.job;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
/**
* Request for submitting a job.
*
- * <p>We currently require the job-jars to be uploaded through the blob-server.
+ * <p>This request only contains the names of files that must be present on the server, and defines how these files are
+ * interpreted.
*/
public final class JobSubmitRequestBody implements RequestBody {
- private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph";
+ private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+ private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
- /**
- * The serialized job graph.
- */
- @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
- public final byte[] serializedJobGraph;
+ @JsonProperty(FIELD_NAME_JOB_GRAPH)
+ public final String jobGraphFileName;
- public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
- this(serializeJobGraph(jobGraph));
- }
+ @JsonProperty(FIELD_NAME_JOB_JARS)
+ public final Collection<String> jarFileNames;
@JsonCreator
public JobSubmitRequestBody(
- @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) {
- this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph);
+ @JsonProperty(FIELD_NAME_JOB_GRAPH) String jobGraphFileName,
+ @JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> jarFileNames) {
+ this.jobGraphFileName = jobGraphFileName;
+ this.jarFileNames = jarFileNames;
}
@Override
- public int hashCode() {
- return 71 * Arrays.hashCode(this.serializedJobGraph);
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+ return Objects.equals(jobGraphFileName, that.jobGraphFileName) &&
+ Objects.equals(jarFileNames, that.jarFileNames);
}
@Override
- public boolean equals(Object object) {
- if (object instanceof JobSubmitRequestBody) {
- JobSubmitRequestBody other = (JobSubmitRequestBody) object;
- return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph);
- }
- return false;
+ public int hashCode() {
+ return Objects.hash(jobGraphFileName, jarFileNames);
}
- private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) {
- ObjectOutputStream out = new ObjectOutputStream(baos);
-
- out.writeObject(jobGraph);
-
- return baos.toByteArray();
- }
+ @Override
+ public String toString() {
+ return "JobSubmitRequestBody{" +
+ "jobGraphFileName='" + jobGraphFileName + '\'' +
+ ", jarFileNames=" + jarFileNames +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
index c5135bf..a538e17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
@@ -23,6 +23,10 @@ import org.apache.flink.configuration.ConfigConstants;
/**
* This class contains constants to be used by rest components.
*/
-public class RestConstants {
+public enum RestConstants {
+ ;
+
public static final String REST_CONTENT_TYPE = "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name();
+ public static final String CONTENT_TYPE_JAR = "application/java-archive";
+ public static final String CONTENT_TYPE_BINARY = "application/octet-stream";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
index ffcc72a..e759d30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
@@ -80,7 +80,8 @@ public class AbstractHandlerTest extends TestLogger {
final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
CompletableFuture.completedFuture(mockRestfulGateway);
- TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+ CompletableFuture<Void> requestProcessingCompleteFuture = new CompletableFuture<>();
+ TestHandler handler = new TestHandler(requestProcessingCompleteFuture, CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
HttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
@@ -99,6 +100,9 @@ public class AbstractHandlerTest extends TestLogger {
handler.respondAsLeader(context, routerRequest, mockRestfulGateway);
+ // the (asynchronous) request processing is not yet complete so the files should still exist
+ Assert.assertTrue(Files.exists(file));
+ requestProcessingCompleteFuture.complete(null);
Assert.assertFalse(Files.exists(file));
}
@@ -154,14 +158,16 @@ public class AbstractHandlerTest extends TestLogger {
}
private static class TestHandler extends AbstractHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
+ private final CompletableFuture<Void> completionFuture;
- protected TestHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
+ protected TestHandler(CompletableFuture<Void> completionFuture, @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
super(localAddressFuture, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestHeaders.INSTANCE);
+ this.completionFuture = completionFuture;
}
@Override
- protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
-
+ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+ return completionFuture;
}
private enum TestHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
@@ -186,6 +192,11 @@ public class AbstractHandlerTest extends TestLogger {
public String getTargetRestEndpointURL() {
return "/test";
}
+
+ @Override
+ public boolean acceptsFileUploads() {
+ return true;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 1311b80..0153d5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -66,6 +66,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertArrayEquals;
@@ -203,7 +204,7 @@ public class MultipartUploadResource extends ExternalResource {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles());
+ MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList()));
this.lastReceivedRequest = request.getRequestBody();
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
}
@@ -268,7 +269,7 @@ public class MultipartUploadResource extends ExternalResource {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Collection<Path> uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
if (!uploadedFiles.isEmpty()) {
throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
@@ -313,7 +314,7 @@ public class MultipartUploadResource extends ExternalResource {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- verifyFileUpload(expectedFiles, request.getUploadedFiles());
+ verifyFileUpload(expectedFiles, request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList()));
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index b9413ba..93dbb5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -85,6 +85,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static org.hamcrest.Matchers.containsString;
@@ -661,7 +662,7 @@ public class RestServerEndpointITCase extends TestLogger {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException {
- Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Collection<Path> uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
if (uploadedFiles.size() != 1) {
throw new RestHandlerException("Expected 1 file, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
index fb7faa3..2b14641 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
@@ -25,11 +25,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
+import java.util.stream.Collectors;
/**
* Tests for {@link FileUploads}.
@@ -64,7 +66,7 @@ public class FileUploadsTest extends TestLogger {
Files.createFile(tmp.resolve(subFile));
try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
- Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+ Collection<Path> detectedFiles = fileUploads.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
Assert.assertEquals(2, detectedFiles.size());
Assert.assertTrue(detectedFiles.contains(tmp.resolve(rootFile)));
@@ -80,7 +82,7 @@ public class FileUploadsTest extends TestLogger {
Files.createDirectory(tmp.resolve(rootDir));
try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
- Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+ Collection<File> detectedFiles = fileUploads.getUploadedFiles();
Assert.assertEquals(0, detectedFiles.size());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
deleted file mode 100644
index 15c2eb4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the {@link BlobServerPortHandler}.
- */
-public class BlobServerPortHandlerTest extends TestLogger {
- private static final int PORT = 64;
-
- @Test
- public void testPortRetrieval() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.getBlobServerPort(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(PORT));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
-
- BlobServerPortHandler handler = new BlobServerPortHandler(
- CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
- RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
-
- BlobServerPortResponseBody portResponse = handler
- .handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
- .get();
-
- Assert.assertEquals(PORT, portResponse.port);
- }
-
- @Test
- public void testPortRetrievalFailureHandling() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.getBlobServerPort(any(Time.class)))
- .thenReturn(FutureUtils.completedExceptionally(new TestException()));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
-
- BlobServerPortHandler handler = new BlobServerPortHandler(
- CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
- RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
-
- try {
- handler
- .handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
- .get();
- Assert.fail();
- } catch (ExecutionException ee) {
- RestHandlerException rhe = (RestHandlerException) ee.getCause();
-
- Assert.assertEquals(TestException.class, rhe.getCause().getClass());
- Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, rhe.getHttpResponseStatus());
- }
- }
-
- private static class TestException extends Exception {
- private static final long serialVersionUID = -7064446788277853899L;
- }
-}