You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/03 19:09:09 UTC

[4/5] flink git commit: [FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files

[FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files

This closes #6203.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a25cd3fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a25cd3fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a25cd3fe

Branch: refs/heads/master
Commit: a25cd3feddd19e75456db32a704ee5509e85dd47
Parents: 544bfb9
Author: zentol <ch...@apache.org>
Authored: Mon Jun 11 11:45:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 3 21:08:45 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/rest_dispatcher.html   | 249 +++++++++----------
 .../client/program/rest/RestClusterClient.java  | 122 ++++-----
 .../program/rest/RestClusterClientTest.java     |  23 --
 .../webmonitor/handlers/JarRunHandler.java      |   2 +-
 .../webmonitor/handlers/JarUploadHandler.java   |   5 +-
 .../handlers/JarUploadHandlerTest.java          |   2 +-
 .../flink/runtime/client/ClientUtils.java       |  57 +++--
 .../apache/flink/runtime/client/JobClient.java  |   2 +-
 .../client/JobSubmissionClientActor.java        |   2 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  11 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   2 +-
 .../flink/runtime/rest/AbstractHandler.java     | 140 +++++------
 .../rest/handler/AbstractRestHandler.java       |   6 +-
 .../flink/runtime/rest/handler/FileUploads.java |   9 +-
 .../runtime/rest/handler/HandlerRequest.java    |   8 +-
 .../rest/handler/job/BlobServerPortHandler.java |  66 -----
 .../rest/handler/job/JobSubmitHandler.java      | 136 +++++++++-
 .../AbstractTaskManagerFileHandler.java         |   4 +-
 .../rest/messages/BlobServerPortHeaders.java    |  74 ------
 .../messages/BlobServerPortResponseBody.java    |  57 -----
 .../rest/messages/job/JobSubmitHeaders.java     |  11 +-
 .../rest/messages/job/JobSubmitRequestBody.java | 115 ++++++---
 .../flink/runtime/rest/util/RestConstants.java  |   6 +-
 .../flink/runtime/client/ClientUtilsTest.java   |   4 +-
 .../flink/runtime/rest/AbstractHandlerTest.java |  19 +-
 .../runtime/rest/MultipartUploadResource.java   |   7 +-
 .../runtime/rest/RestServerEndpointITCase.java  |   3 +-
 .../runtime/rest/handler/FileUploadsTest.java   |   6 +-
 .../handler/job/BlobServerPortHandlerTest.java  | 101 --------
 .../rest/handler/job/JobSubmitHandlerTest.java  | 188 ++++++++++++--
 .../messages/BlobServerPortResponseTest.java    |  35 ---
 .../rest/messages/JobSubmitRequestBodyTest.java |   9 +-
 .../webmonitor/TestingDispatcherGateway.java    | 203 +++++++++++++++
 .../webmonitor/TestingRestfulGateway.java       |  30 +--
 34 files changed, 951 insertions(+), 763 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index c74da9c..6ed59be 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -1,50 +1,6 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/blobserver/port</strong></td>
-    </tr>
-    <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
-      <td class="text-left">Response code: <code>200 OK</code></td>
-    </tr>
-    <tr>
-      <td colspan="2">Returns the port of blob server which can be used to upload jars.</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#607508253">Request</button>
-        <div id="607508253" class="collapse">
-          <pre>
-            <code>
-{}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#1913718109">Response</button>
-        <div id="1913718109" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
-  "properties" : {
-    "port" : {
-      "type" : "integer"
-    }
-  }
-}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-  </tbody>
-</table>
-<table class="table table-bordered">
-  <tbody>
-    <tr>
       <td class="text-left" colspan="2"><strong>/cluster</strong></td>
     </tr>
     <tr>
@@ -226,19 +182,11 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#919877128">Request</button>
-        <div id="919877128" class="collapse">
+        <button data-toggle="collapse" data-target="#-1290030289">Request</button>
+        <div id="-1290030289" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:FileUpload",
-  "properties" : {
-    "path" : {
-      "type" : "string"
-    }
-  }
-}            </code>
+{}            </code>
           </pre>
          </div>
       </td>
@@ -607,7 +555,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
       <td class="text-left">Response code: <code>202 Accepted</code></td>
     </tr>
     <tr>
-      <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client.</td>
+      <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client. This call expects amultipart/form-data request that consists of file uploads for the serialized JobGraph, jars anddistributed cache artifacts and an attribute named "request"for the JSON payload.</td>
     </tr>
     <tr>
       <td colspan="2">
@@ -619,10 +567,28 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
   "type" : "object",
   "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
   "properties" : {
-    "serializedJobGraph" : {
+    "jobGraphFileName" : {
+      "type" : "string"
+    },
+    "jobJarFileNames" : {
       "type" : "array",
       "items" : {
-        "type" : "integer"
+        "type" : "string"
+      }
+    },
+    "jobArtifactFileNames" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody:DistributedCacheFile",
+        "properties" : {
+          "entryName" : {
+            "type" : "string"
+          },
+          "fileName" : {
+            "type" : "string"
+          }
+        }
       }
     }
   }
@@ -2461,79 +2427,6 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
-    </tr>
-    <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
-      <td class="text-left">Response code: <code>200 OK</code></td>
-    </tr>
-    <tr>
-      <td colspan="2">Returns user-defined accumulators of a task, aggregated across all subtasks.</td>
-    </tr>
-    <tr>
-      <td colspan="2">Path parameters</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <ul>
-<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#485581006">Request</button>
-        <div id="485581006" class="collapse">
-          <pre>
-            <code>
-{}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1070353054">Response</button>
-        <div id="-1070353054" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexAccumulatorsInfo",
-  "properties" : {
-    "id" : {
-      "type" : "string"
-    },
-    "user-accumulators" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
-        "properties" : {
-          "name" : {
-            "type" : "string"
-          },
-          "type" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
-}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-  </tbody>
-</table>
-<table class="table table-bordered">
-  <tbody>
-    <tr>
       <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td>
     </tr>
     <tr>
@@ -2675,6 +2568,100 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
 <table class="table table-bordered">
   <tbody>
     <tr>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/accumulators</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns all user-defined accumulators for all subtasks of a task.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#-886388859">Request</button>
+        <div id="-886388859" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#112317594">Response</button>
+        <div id="112317594" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo",
+  "properties" : {
+    "id" : {
+      "type" : "any"
+    },
+    "parallelism" : {
+      "type" : "integer"
+    },
+    "subtasks" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo:SubtaskAccumulatorsInfo",
+        "properties" : {
+          "subtask" : {
+            "type" : "integer"
+          },
+          "attempt" : {
+            "type" : "integer"
+          },
+          "host" : {
+            "type" : "string"
+          },
+          "user-accumulators" : {
+            "type" : "array",
+            "items" : {
+              "type" : "object",
+              "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
+              "properties" : {
+                "name" : {
+                  "type" : "string"
+                },
+                "type" : {
+                  "type" : "string"
+                },
+                "value" : {
+                  "type" : "string"
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
       <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 85699d7..935a07f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -22,15 +22,16 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.NewClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.client.ClientUtils;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -49,8 +51,6 @@ import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeader
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -87,10 +87,10 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq
 import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.ScalaUtils;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
@@ -102,20 +102,20 @@ import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import akka.actor.AddressFromURIString;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.ObjectOutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -315,36 +315,61 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 		// we have to enable queued scheduling because slot will be allocated lazily
 		jobGraph.setAllowQueuedScheduling(true);
 
-		log.info("Requesting blob server port.");
-		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
+		CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
+			try {
+				final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
+				try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+					objectOut.writeObject(jobGraph);
+				}
+				return jobGraphFile;
+			} catch (IOException e) {
+				throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
+			}
+		}, executorService);
 
-		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
-			getDispatcherAddress(),
-			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
-				final int blobServerPort = response.port;
-				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
+		CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
+			List<String> jarFileNames = new ArrayList<>(8);
+			List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
+			Collection<FileUpload> filesToUpload = new ArrayList<>(8);
 
-				try {
-					ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig));
-				} catch (Exception e) {
-					throw new CompletionException(e);
-				}
+			filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
 
-				return jobGraph;
-			});
+			for (Path jar : jobGraph.getUserJars()) {
+				jarFileNames.add(jar.getName());
+				filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+			}
 
-		CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose(
-			(JobGraph jobGraphToSubmit) -> {
-				log.info("Submitting job graph.");
+			for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
+				artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName()));
+				filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
+			}
 
-				try {
-					return sendRequest(
-						JobSubmitHeaders.getInstance(),
-						new JobSubmitRequestBody(jobGraph));
-				} catch (IOException ioe) {
-					throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe));
-				}
-			});
+			final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
+				jobGraphFile.getFileName().toString(),
+				jarFileNames,
+				artifactFileNames);
+
+			return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
+		});
+
+		final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
+			requestAndFileUploads -> sendRetriableRequest(
+				JobSubmitHeaders.getInstance(),
+				EmptyMessageParameters.getInstance(),
+				requestAndFileUploads.f0,
+				requestAndFileUploads.f1,
+				isConnectionProblemOrServiceUnavailable())
+		);
+
+		submissionFuture
+			.thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
+			.thenAccept(jobGraphFile -> {
+			try {
+				Files.delete(jobGraphFile);
+			} catch (IOException e) {
+				log.warn("Could not delete temporary file {}.", jobGraphFile, e);
+			}
+		});
 
 		return submissionFuture
 			.thenApply(
@@ -676,9 +701,14 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 
 	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
 			sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) {
+		return sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate);
+	}
+
+	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+	sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {
 		return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
 			try {
-				return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+				return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload);
 			} catch (IOException e) {
 				throw new CompletionException(e);
 			}
@@ -736,26 +766,4 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 				}
 			}, executorService);
 	}
-
-	private CompletableFuture<String> getDispatcherAddress() {
-		return FutureUtils.orTimeout(
-				dispatcherLeaderRetriever.getLeaderFuture(),
-				restClusterClientConfiguration.getAwaitLeaderTimeout(),
-				TimeUnit.MILLISECONDS)
-			.thenApplyAsync(leaderAddressSessionId -> {
-				final String address = leaderAddressSessionId.f0;
-				final Optional<String> host = ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host());
-
-				return host.orElseGet(() -> {
-					// if the dispatcher address does not contain a host part, then assume it's running
-					// on the same machine as the client
-					log.info("The dispatcher seems to run without remoting enabled. This indicates that we are " +
-						"in a test. This can only work if the RestClusterClient runs on the same machine. " +
-						"Assuming, therefore, 'localhost' as the host.");
-
-					return "localhost";
-				});
-			}, executorService);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f025d67..75f16c0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -53,8 +53,6 @@ import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -226,7 +224,6 @@ public class RestClusterClientTest extends TestLogger {
 
 	@Test
 	public void testJobSubmitCancelStop() throws Exception {
-		TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler();
 		TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
 		TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
 		TestJobExecutionResultHandler testJobExecutionResultHandler =
@@ -237,15 +234,12 @@ public class RestClusterClientTest extends TestLogger {
 					.build()));
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
-			portHandler,
 			submitHandler,
 			terminationHandler,
 			testJobExecutionResultHandler)) {
 
-			Assert.assertFalse(portHandler.portRetrieved);
 			Assert.assertFalse(submitHandler.jobSubmitted);
 			restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-			Assert.assertTrue(portHandler.portRetrieved);
 			Assert.assertTrue(submitHandler.jobSubmitted);
 
 			Assert.assertFalse(terminationHandler.jobCanceled);
@@ -264,11 +258,9 @@ public class RestClusterClientTest extends TestLogger {
 	@Test
 	public void testDetachedJobSubmission() throws Exception {
 
-		final TestBlobServerPortHandler testBlobServerPortHandler = new TestBlobServerPortHandler();
 		final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
-			testBlobServerPortHandler,
 			testJobSubmitHandler)) {
 
 			restClusterClient.setDetached(true);
@@ -282,20 +274,6 @@ public class RestClusterClientTest extends TestLogger {
 
 	}
 
-	private class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-		private volatile boolean portRetrieved = false;
-
-		private TestBlobServerPortHandler() {
-			super(BlobServerPortHeaders.getInstance());
-		}
-
-		@Override
-		protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-			portRetrieved = true;
-			return CompletableFuture.completedFuture(new BlobServerPortResponseBody(12000));
-		}
-	}
-
 	private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 		private volatile boolean jobSubmitted = false;
 
@@ -390,7 +368,6 @@ public class RestClusterClientTest extends TestLogger {
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
 			testJobExecutionResultHandler,
-			new TestBlobServerPortHandler(),
 			new TestJobSubmitHandler())) {
 
 			JobExecutionResult jobExecutionResult;

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 10387c8..1e620d4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -114,7 +114,7 @@ public class JarRunHandler extends
 		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
 			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
 			try {
-				ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration));
+				ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration));
 			} catch (FlinkException e) {
 				throw new CompletionException(e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index a1ef82b..83db224 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -32,6 +32,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -70,11 +71,11 @@ public class JarUploadHandler extends
 	protected CompletableFuture<JarUploadResponseBody> handleRequest(
 			@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
 			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
-		Collection<Path> uploadedFiles = request.getUploadedFiles();
+		Collection<File> uploadedFiles = request.getUploadedFiles();
 		if (uploadedFiles.size() != 1) {
 			throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
 		}
-		final Path fileUpload = uploadedFiles.iterator().next();
+		final Path fileUpload = uploadedFiles.iterator().next().toPath();
 		return CompletableFuture.supplyAsync(() -> {
 			if (!fileUpload.getFileName().toString().endsWith(".jar")) {
 				throw new CompletionException(new RestHandlerException(

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index 812d4c6..c9e25ed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -131,6 +131,6 @@ public class JarUploadHandlerTest extends TestLogger {
 			EmptyMessageParameters.getInstance(),
 			Collections.emptyMap(),
 			Collections.emptyMap(),
-			Collections.singleton(uploadedFile));
+			Collections.singleton(uploadedFile.toFile()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
index fc6a621..06baaaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.client;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -32,7 +31,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -43,20 +41,41 @@ public enum ClientUtils {
 	;
 
 	/**
-	 * Uploads all files required for the execution of the given {@link JobGraph} using the {@link BlobClient} from
-	 * the given {@link Supplier}.
+	 * Extracts all files required for the execution from the given {@link JobGraph} and uploads them using the {@link BlobClient}
+	 * from the given {@link Supplier}.
 	 *
 	 * @param jobGraph jobgraph requiring files
 	 * @param clientSupplier supplier of blob client to upload files with
-	 * @throws IOException if the upload fails
+	 * @throws FlinkException if the upload fails
 	 */
-	public static void uploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
+	public static void extractAndUploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
 		List<Path> userJars = jobGraph.getUserJars();
-		Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts();
+		Collection<Tuple2<String, Path>> userArtifacts = jobGraph.getUserArtifacts().entrySet().stream()
+			.map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath)))
+			.collect(Collectors.toList());
+
+		uploadJobGraphFiles(jobGraph, userJars, userArtifacts, clientSupplier);
+	}
+
+	/**
+	 * Uploads the given jars and artifacts required for the execution of the given {@link JobGraph} using the {@link BlobClient} from
+	 * the given {@link Supplier}.
+	 *
+	 * @param jobGraph jobgraph requiring files
+	 * @param userJars jars to upload
+	 * @param userArtifacts artifacts to upload
+	 * @param clientSupplier supplier of blob client to upload files with
+	 * @throws FlinkException if the upload fails
+	 */
+	public static void uploadJobGraphFiles(
+			JobGraph jobGraph,
+			Collection<Path> userJars,
+			Collection<Tuple2<String, org.apache.flink.core.fs.Path>> userArtifacts,
+			SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
 		if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
 			try (BlobClient client = clientSupplier.get()) {
-				uploadAndSetUserJars(jobGraph, client);
-				uploadAndSetUserArtifacts(jobGraph, client);
+				uploadAndSetUserJars(jobGraph, userJars, client);
+				uploadAndSetUserArtifacts(jobGraph, userArtifacts, client);
 			} catch (IOException ioe) {
 				throw new FlinkException("Could not upload job files.", ioe);
 			}
@@ -64,15 +83,15 @@ public enum ClientUtils {
 	}
 
 	/**
-	 * Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient},
-	 * and sets the appropriate blobkeys.
+	 * Uploads the given user jars using the given {@link BlobClient}, and sets the appropriate blobkeys on the given {@link JobGraph}.
 	 *
-	 * @param jobGraph   jobgraph requiring user jars
+	 * @param jobGraph jobgraph requiring user jars
+	 * @param userJars jars to upload
 	 * @param blobClient client to upload jars with
 	 * @throws IOException if the upload fails
 	 */
-	private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException {
-		Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+	private static void uploadAndSetUserJars(JobGraph jobGraph, Collection<Path> userJars, BlobClient blobClient) throws IOException {
+		Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), userJars, blobClient);
 		setUserJarBlobKeys(blobKeys, jobGraph);
 	}
 
@@ -90,18 +109,14 @@ public enum ClientUtils {
 	}
 
 	/**
-	 * Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient},
-	 * and sets the appropriate blobkeys.
+	 * Uploads the given user artifacts using the given {@link BlobClient}, and sets the appropriate blobkeys on the given {@link JobGraph}.
 	 *
 	 * @param jobGraph jobgraph requiring user artifacts
+	 * @param artifactPaths artifacts to upload
 	 * @param blobClient client to upload artifacts with
 	 * @throws IOException if the upload fails
 	 */
-	private static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException {
-		Collection<Tuple2<String, Path>> artifactPaths = jobGraph.getUserArtifacts().entrySet().stream()
-			.map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath)))
-			.collect(Collectors.toList());
-
+	private static void uploadAndSetUserArtifacts(JobGraph jobGraph, Collection<Tuple2<String, Path>> artifactPaths, BlobClient blobClient) throws IOException {
 		Collection<Tuple2<String, PermanentBlobKey>> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient);
 		setUserArtifactBlobKeys(jobGraph, blobKeys);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 27da3b8..635def2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -424,7 +424,7 @@ public class JobClient {
 		}
 
 		try {
-			ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config));
+			ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config));
 		} catch (FlinkException e) {
 			throw new JobSubmissionException(jobGraph.getJobID(),
 					"Could not upload job files.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 2783b09..89978fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -155,7 +155,7 @@ public class JobSubmissionClientActor extends JobClientActor {
 		final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync(
 			(InetSocketAddress blobServerAddress) -> {
 				try {
-					ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig));
+					ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig));
 				} catch (FlinkException e) {
 					throw new CompletionException(e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8072cf4..4279330 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -88,17 +87,12 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 
 		final Time timeout = restConfiguration.getTimeout();
 
-		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders);
-
 		JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
-			responseHeaders);
+			responseHeaders,
+			executor);
 
 		if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
 			try {
@@ -125,7 +119,6 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			log.info("Web-based job submission is not enabled.");
 		}
 
-		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
 		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
 
 		return handlers;

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 4fab2b8..97ab5a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -676,7 +676,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
 		return blobServerAddressFuture.thenAccept(blobServerAddress -> {
 			try {
-				ClientUtils.uploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
+				ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
 			} catch (FlinkException e) {
 				throw new CompletionException(e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 1e88425..0d8605a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -49,7 +49,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -82,98 +84,84 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 	}
 
 	@Override
-	protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception {
+	protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) {
 		HttpRequest httpRequest = routedRequest.getRequest();
 		if (log.isTraceEnabled()) {
 			log.trace("Received request " + httpRequest.uri() + '.');
 		}
 
+		FileUploads uploadedFiles = null;
 		try {
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
 				log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
-				HandlerUtils.sendErrorResponse(
-					ctx,
-					httpRequest,
-					new ErrorResponseBody("Bad request received."),
-					HttpResponseStatus.BAD_REQUEST,
-					responseHeaders);
-				return;
+				throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST);
 			}
 
 			final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
 
-			try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) {
+			uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx);
 
-				if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						new ErrorResponseBody("File uploads not allowed."),
-						HttpResponseStatus.BAD_REQUEST,
-						responseHeaders);
-					return;
-				}
+			if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
+				throw new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST);
+			}
 
-				R request;
-				if (msgContent.capacity() == 0) {
-					try {
-						request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
-					} catch (JsonParseException | JsonMappingException je) {
-						log.error("Request did not conform to expected format.", je);
-						HandlerUtils.sendErrorResponse(
-							ctx,
-							httpRequest,
-							new ErrorResponseBody("Bad request received."),
-							HttpResponseStatus.BAD_REQUEST,
-							responseHeaders);
-						return;
-					}
-				} else {
-					try {
-						ByteBufInputStream in = new ByteBufInputStream(msgContent);
-						request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
-					} catch (JsonParseException | JsonMappingException je) {
-						log.error("Failed to read request.", je);
-						HandlerUtils.sendErrorResponse(
-							ctx,
-							httpRequest,
-							new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
-							HttpResponseStatus.BAD_REQUEST,
-							responseHeaders);
-						return;
-					}
+			R request;
+			if (msgContent.capacity() == 0) {
+				try {
+					request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+				} catch (JsonParseException | JsonMappingException je) {
+					log.error("Request did not conform to expected format.", je);
+					throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
 				}
-
-				final HandlerRequest<R, M> handlerRequest;
-
+			} else {
 				try {
-					handlerRequest = new HandlerRequest<R, M>(
-						request,
-						untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
-						routedRequest.getRouteResult().pathParams(),
-						routedRequest.getRouteResult().queryParams(),
-						uploadedFiles.getUploadedFiles());
-				} catch (HandlerRequestException hre) {
-					log.error("Could not create the handler request.", hre);
-
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
+					ByteBufInputStream in = new ByteBufInputStream(msgContent);
+					request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
+				} catch (JsonParseException | JsonMappingException je) {
+					log.error("Failed to read request.", je);
+					throw new RestHandlerException(
+						String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
 						HttpResponseStatus.BAD_REQUEST,
-						responseHeaders);
-					return;
+						je);
 				}
+			}
 
-				respondToRequest(
-					ctx,
-					httpRequest,
-					handlerRequest,
-					gateway);
+			final HandlerRequest<R, M> handlerRequest;
+
+			try {
+				handlerRequest = new HandlerRequest<R, M>(
+					request,
+					untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
+					routedRequest.getRouteResult().pathParams(),
+					routedRequest.getRouteResult().queryParams(),
+					uploadedFiles.getUploadedFiles());
+			} catch (HandlerRequestException hre) {
+				log.error("Could not create the handler request.", hre);
+				throw new RestHandlerException(
+					String.format("Bad request, could not parse parameters: %s", hre.getMessage()),
+					HttpResponseStatus.BAD_REQUEST,
+					hre);
 			}
 
+			CompletableFuture<Void> requestProcessingFuture = respondToRequest(
+				ctx,
+				httpRequest,
+				handlerRequest,
+				gateway);
+
+			final FileUploads finalUploadedFiles = uploadedFiles;
+			requestProcessingFuture
+				.whenComplete((Void ignored, Throwable throwable) -> cleanupFileUploads(finalUploadedFiles));
+		} catch (RestHandlerException rhe) {
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				httpRequest,
+				new ErrorResponseBody(rhe.getMessage()),
+				rhe.getHttpResponseStatus(),
+				responseHeaders);
+			cleanupFileUploads(uploadedFiles);
 		} catch (Throwable e) {
 			log.error("Request processing failed.", e);
 			HandlerUtils.sendErrorResponse(
@@ -182,6 +170,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 				new ErrorResponseBody("Internal server error."),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				responseHeaders);
+			cleanupFileUploads(uploadedFiles);
+		}
+	}
+
+	private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
+		if (uploadedFiles != null) {
+			try {
+				uploadedFiles.close();
+			} catch (IOException e) {
+				log.warn("Could not cleanup uploaded files.", e);
+			}
 		}
 	}
 
@@ -192,9 +191,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 	 * @param httpRequest original http request
 	 * @param handlerRequest typed handler request
 	 * @param gateway leader gateway
+	 * @return Future which is completed once the request has been processed
 	 * @throws RestHandlerException if an exception occurred while responding
 	 */
-	protected abstract void respondToRequest(
+	protected abstract CompletableFuture<Void> respondToRequest(
 		ChannelHandlerContext ctx,
 		HttpRequest httpRequest,
 		HandlerRequest<R, M> handlerRequest,

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 448711b..e4cec08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -70,7 +70,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 	}
 
 	@Override
-	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
+	protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
 		CompletableFuture<P> response;
 
 		try {
@@ -79,7 +79,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 			response = FutureUtils.completedExceptionally(e);
 		}
 
-		response.whenComplete((P resp, Throwable throwable) -> {
+		return response.whenComplete((P resp, Throwable throwable) -> {
 			if (throwable != null) {
 
 				Throwable error = ExceptionUtils.stripCompletionException(throwable);
@@ -105,7 +105,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 					messageHeaders.getResponseStatusCode(),
 					responseHeaders);
 			}
-		});
+		}).thenApply(ignored -> null);
 	}
 
 	private void processRestHandlerException(ChannelHandlerContext ctx, HttpRequest httpRequest, RestHandlerException rhe) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
index 31ac47bb..b233cb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
@@ -24,6 +24,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
@@ -58,7 +59,7 @@ public final class FileUploads implements AutoCloseable {
 		this.uploadDirectory = uploadDirectory;
 	}
 
-	public Collection<Path> getUploadedFiles() throws IOException {
+	public Collection<File> getUploadedFiles() throws IOException {
 		if (uploadDirectory == null) {
 			return Collections.emptyList();
 		}
@@ -78,9 +79,9 @@ public final class FileUploads implements AutoCloseable {
 
 	private static final class FileAdderVisitor extends SimpleFileVisitor<Path> {
 
-		private final Collection<Path> files = new ArrayList<>(4);
+		private final Collection<File> files = new ArrayList<>(4);
 
-		Collection<Path> getContainedFiles() {
+		Collection<File> getContainedFiles() {
 			return files;
 		}
 
@@ -90,7 +91,7 @@ public final class FileUploads implements AutoCloseable {
 		@Override
 		public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
 			FileVisitResult result = super.visitFile(file, attrs);
-			files.add(file);
+			files.add(file.toFile());
 			return result;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 7e93556..990dae5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
-import java.nio.file.Path;
+import java.io.File;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,7 +43,7 @@ import java.util.StringJoiner;
 public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
 
 	private final R requestBody;
-	private final Collection<Path> uploadedFiles;
+	private final Collection<File> uploadedFiles;
 	private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
 	private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
@@ -55,7 +55,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 		this(requestBody, messageParameters, receivedPathParameters, receivedQueryParameters, Collections.emptyList());
 	}
 
-	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<Path> uploadedFiles) throws HandlerRequestException {
+	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<File> uploadedFiles) throws HandlerRequestException {
 		this.requestBody = Preconditions.checkNotNull(requestBody);
 		this.uploadedFiles = Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles));
 		Preconditions.checkNotNull(messageParameters);
@@ -141,7 +141,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 	}
 
 	@Nonnull
-	public Collection<Path> getUploadedFiles() {
+	public Collection<File> getUploadedFiles() {
 		return uploadedFiles;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
deleted file mode 100644
index 4b5fa89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-
-/**
- * This handler can be used to retrieve the port that the blob server runs on.
- */
-public final class BlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
-	public BlobServerPortHandler(
-			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
-			Time timeout,
-			Map<String, String> headers) {
-		super(localRestAddress, leaderRetriever, timeout, headers, BlobServerPortHeaders.getInstance());
-	}
-
-	@Override
-	protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-		return gateway
-			.getBlobServerPort(timeout)
-			.thenApply(BlobServerPortResponseBody::new)
-			.exceptionally(error -> {
-				throw new CompletionException(new RestHandlerException(
-					"Failed to retrieve blob server port.",
-					HttpResponseStatus.INTERNAL_SERVER_ERROR,
-					ExceptionUtils.stripCompletionException(error)));
-			});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index af04629..052b056 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -19,8 +19,14 @@
 package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -29,47 +35,153 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 
+	private static final String FILE_TYPE_JOB_GRAPH = "JobGraph";
+	private static final String FILE_TYPE_JAR = "Jar";
+	private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+	private final Executor executor;
+
 	public JobSubmitHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
 			Time timeout,
-			Map<String, String> headers) {
+			Map<String, String> headers,
+			Executor executor) {
 		super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance());
+		this.executor = executor;
 	}
 
 	@Override
 	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-		JobGraph jobGraph;
-		try {
-			ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-			jobGraph = (JobGraph) objectIn.readObject();
-		} catch (Exception e) {
+		final Collection<File> uploadedFiles = request.getUploadedFiles();
+		final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
+			File::getName,
+			Path::fromLocalFile
+		));
+
+		if (uploadedFiles.size() != nameToFile.size()) {
 			throw new RestHandlerException(
-				"Failed to deserialize JobGraph.",
-				HttpResponseStatus.BAD_REQUEST,
-				e);
+				String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
+					uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
+					nameToFile.size(),
+					uploadedFiles.size()),
+				HttpResponseStatus.BAD_REQUEST
+			);
 		}
 
-		return gateway.submitJob(jobGraph, timeout)
-			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
+		final JobSubmitRequestBody requestBody = request.getRequestBody();
+
+		CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
+
+		Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
+
+		Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
+
+		CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts);
+
+		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
+
+		return jobSubmissionFuture.thenCombine(jobGraphFuture,
+			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
 			.exceptionally(exception -> {
 				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
 			});
 	}
+
+	private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody requestBody, Map<String, Path> nameToFile) throws MissingFileException {
+		final Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, nameToFile);
+
+		return CompletableFuture.supplyAsync(() -> {
+			JobGraph jobGraph;
+			try (ObjectInputStream objectIn = new ObjectInputStream(jobGraphFile.getFileSystem().open(jobGraphFile))) {
+				jobGraph = (JobGraph) objectIn.readObject();
+			} catch (Exception e) {
+				throw new CompletionException(new RestHandlerException(
+					"Failed to deserialize JobGraph.",
+					HttpResponseStatus.BAD_REQUEST,
+					e));
+			}
+			return jobGraph;
+		}, executor);
+	}
+
+	private static Collection<Path> getJarFilesToUpload(Collection<String> jarFileNames, Map<String, Path> nameToFileMap) throws MissingFileException {
+		Collection<Path> jarFiles = new ArrayList<>(jarFileNames.size());
+		for (String jarFileName : jarFileNames) {
+			Path jarFile = getPathAndAssertUpload(jarFileName, FILE_TYPE_JAR, nameToFileMap);
+			jarFiles.add(new Path(jarFile.toString()));
+		}
+		return jarFiles;
+	}
+
+	private static Collection<Tuple2<String, Path>> getArtifactFilesToUpload(
+			Collection<JobSubmitRequestBody.DistributedCacheFile> artifactEntries,
+			Map<String, Path> nameToFileMap) throws MissingFileException {
+		Collection<Tuple2<String, Path>> artifacts = new ArrayList<>(artifactEntries.size());
+		for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : artifactEntries) {
+			Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, FILE_TYPE_ARTIFACT, nameToFileMap);
+			artifacts.add(Tuple2.of(artifactFileName.entryName, new Path(artifactFile.toString())));
+		}
+		return artifacts;
+	}
+
+	private CompletableFuture<JobGraph> uploadJobGraphFiles(
+			DispatcherGateway gateway,
+			CompletableFuture<JobGraph> jobGraphFuture,
+			Collection<Path> jarFiles,
+			Collection<Tuple2<String, Path>> artifacts) {
+		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+		return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
+			final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
+			try {
+				ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, new Configuration()));
+			} catch (FlinkException e) {
+				throw new CompletionException(new RestHandlerException(
+					"Could not upload job files.",
+					HttpResponseStatus.INTERNAL_SERVER_ERROR,
+					e));
+			}
+			return jobGraph;
+		});
+	}
+
+	private static Path getPathAndAssertUpload(String fileName, String type, Map<String, Path> uploadedFiles) throws MissingFileException {
+		final Path file = uploadedFiles.get(fileName);
+		if (file == null) {
+			throw new MissingFileException(type, fileName);
+		}
+		return file;
+	}
+
+	private static final class MissingFileException extends RestHandlerException {
+
+		private static final long serialVersionUID = -7954810495610194965L;
+
+		MissingFileException(String type, String fileName) {
+			super(type + " file " + fileName + " could not be found on the server.", HttpResponseStatus.BAD_REQUEST);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 83fab69..edefa15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -117,7 +117,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 	}
 
 	@Override
-	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+	protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
 		final ResourceID taskManagerId = handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
 
 		final CompletableFuture<TransientBlobKey> blobKeyFuture;
@@ -152,7 +152,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 			},
 			ctx.executor());
 
-		resultFuture.whenComplete(
+		return resultFuture.whenComplete(
 			(Void ignored, Throwable throwable) -> {
 				if (throwable != null) {
 					log.error("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable);

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
deleted file mode 100644
index a845de3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * These headers define the protocol for querying the port of the blob server.
- */
-public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
-	private static final String URL = "/blobserver/port";
-	private static final BlobServerPortHeaders INSTANCE = new BlobServerPortHeaders();
-
-	private BlobServerPortHeaders() {
-	}
-
-	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
-	}
-
-	@Override
-	public HttpMethodWrapper getHttpMethod() {
-		return HttpMethodWrapper.GET;
-	}
-
-	@Override
-	public String getTargetRestEndpointURL() {
-		return URL;
-	}
-
-	@Override
-	public Class<BlobServerPortResponseBody> getResponseClass() {
-		return BlobServerPortResponseBody.class;
-	}
-
-	@Override
-	public HttpResponseStatus getResponseStatusCode() {
-		return HttpResponseStatus.OK;
-	}
-
-	@Override
-	public EmptyMessageParameters getUnresolvedMessageParameters() {
-		return EmptyMessageParameters.getInstance();
-	}
-
-	public static BlobServerPortHeaders getInstance() {
-		return INSTANCE;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Returns the port of blob server which can be used to upload jars.";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
deleted file mode 100644
index 895ecf3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Response containing the blob server port.
- */
-public final class BlobServerPortResponseBody implements ResponseBody {
-
-	static final String FIELD_NAME_PORT = "port";
-
-	/**
-	 * The port of the blob server.
-	 */
-	@JsonProperty(FIELD_NAME_PORT)
-	public final int port;
-
-	@JsonCreator
-	public BlobServerPortResponseBody(
-		@JsonProperty(FIELD_NAME_PORT) int port) {
-
-		this.port = port;
-	}
-
-	@Override
-	public int hashCode() {
-		return 67 * port;
-	}
-
-	@Override
-	public boolean equals(Object object) {
-		if (object instanceof BlobServerPortResponseBody) {
-			BlobServerPortResponseBody other = (BlobServerPortResponseBody) object;
-			return this.port == other.port;
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
index 88f53f2..42f64b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
+import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -71,6 +72,14 @@ public class JobSubmitHeaders implements MessageHeaders<JobSubmitRequestBody, Jo
 
 	@Override
 	public String getDescription() {
-		return "Submits a job. This call is primarily intended to be used by the Flink client.";
+		return "Submits a job. This call is primarily intended to be used by the Flink client. This call expects a" +
+			"multipart/form-data request that consists of file uploads for the serialized JobGraph, jars and" +
+			"distributed cache artifacts and an attribute named \"" + FileUploadHandler.HTTP_ATTRIBUTE_REQUEST + "\"for " +
+			"the JSON payload.";
+	}
+
+	@Override
+	public boolean acceptsFileUploads() {
+		return true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
index 3f550f0..0ca82b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
@@ -18,64 +18,119 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * <p>We currently require the job-jars to be uploaded through the blob-server.
+ * <p>This request only contains the names of files that must be present on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-	private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph";
+	private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+	private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+	private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames";
 
-	/**
-	 * The serialized job graph.
-	 */
-	@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-	public final byte[] serializedJobGraph;
+	@JsonProperty(FIELD_NAME_JOB_GRAPH)
+	public final String jobGraphFileName;
 
-	public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-		this(serializeJobGraph(jobGraph));
-	}
+	@JsonProperty(FIELD_NAME_JOB_JARS)
+	public final Collection<String> jarFileNames;
+
+	@JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+	public final Collection<DistributedCacheFile> artifactFileNames;
 
 	@JsonCreator
 	public JobSubmitRequestBody(
-			@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) {
-		this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph);
+			@JsonProperty(FIELD_NAME_JOB_GRAPH) String jobGraphFileName,
+			@JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> jarFileNames,
+			@JsonProperty(FIELD_NAME_JOB_ARTIFACTS) Collection<DistributedCacheFile> artifactFileNames) {
+		this.jobGraphFileName = jobGraphFileName;
+		this.jarFileNames = jarFileNames;
+		this.artifactFileNames = artifactFileNames;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+		return Objects.equals(jobGraphFileName, that.jobGraphFileName) &&
+			Objects.equals(jarFileNames, that.jarFileNames) &&
+			Objects.equals(artifactFileNames, that.artifactFileNames);
 	}
 
 	@Override
 	public int hashCode() {
-		return 71 * Arrays.hashCode(this.serializedJobGraph);
+		return Objects.hash(jobGraphFileName, jarFileNames, artifactFileNames);
 	}
 
 	@Override
-	public boolean equals(Object object) {
-		if (object instanceof JobSubmitRequestBody) {
-			JobSubmitRequestBody other = (JobSubmitRequestBody) object;
-			return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph);
-		}
-		return false;
+	public String toString() {
+		return "JobSubmitRequestBody{" +
+			"jobGraphFileName='" + jobGraphFileName + '\'' +
+			", jarFileNames=" + jarFileNames +
+			", artifactFileNames=" + artifactFileNames +
+			'}';
 	}
 
-	private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
-		try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) {
-			ObjectOutputStream out = new ObjectOutputStream(baos);
+	/**
+	 * Descriptor for a distributed cache file.
+	 */
+	public static class DistributedCacheFile {
+		private static final String FIELD_NAME_ENTRY_NAME = "entryName";
+		private static final String FIELD_NAME_FILE_NAME = "fileName";
+
+		@JsonProperty(FIELD_NAME_ENTRY_NAME)
+		public final String entryName;
+
+		@JsonProperty(FIELD_NAME_FILE_NAME)
+		public final String fileName;
+
+		@JsonCreator
+		public DistributedCacheFile(
+			@JsonProperty(FIELD_NAME_ENTRY_NAME) String entryName,
+			@JsonProperty(FIELD_NAME_FILE_NAME) String fileName) {
+			this.entryName = entryName;
+			this.fileName = fileName;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			DistributedCacheFile that = (DistributedCacheFile) o;
+			return Objects.equals(entryName, that.entryName) &&
+				Objects.equals(fileName, that.fileName);
+		}
 
-			out.writeObject(jobGraph);
+		@Override
+		public int hashCode() {
+
+			return Objects.hash(entryName, fileName);
+		}
 
-			return baos.toByteArray();
+		@Override
+		public String toString() {
+			return "DistributedCacheFile{" +
+				"entryName='" + entryName + '\'' +
+				", fileName='" + fileName + '\'' +
+				'}';
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
index c5135bf..a538e17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
@@ -23,6 +23,10 @@ import org.apache.flink.configuration.ConfigConstants;
 /**
  * This class contains constants to be used by rest components.
  */
-public class RestConstants {
+public enum RestConstants {
+	;
+
 	public static final String REST_CONTENT_TYPE = "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name();
+	public static final String CONTENT_TYPE_JAR = "application/java-archive";
+	public static final String CONTENT_TYPE_BINARY = "application/octet-stream";
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
index dc14cb1..f151f28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -85,7 +85,7 @@ public class ClientUtilsTest extends TestLogger {
 		assertEquals(jars.size(), jobGraph.getUserJars().size());
 		assertEquals(0, jobGraph.getUserJarBlobKeys().size());
 
-		ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+		ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
 
 		assertEquals(jars.size(), jobGraph.getUserJars().size());
 		assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
@@ -124,7 +124,7 @@ public class ClientUtilsTest extends TestLogger {
 		assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
 		assertEquals(0, jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());
 
-		ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+		ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
 
 		assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
 		assertEquals(localArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
index 91fba68..607c1c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
@@ -81,7 +81,8 @@ public class AbstractHandlerTest extends TestLogger {
 		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
 			CompletableFuture.completedFuture(mockRestfulGateway);
 
-		TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+		CompletableFuture<Void> requestProcessingCompleteFuture = new CompletableFuture<>();
+		TestHandler handler = new TestHandler(requestProcessingCompleteFuture, CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
 
 		RouteResult<?> routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), "");
 		HttpRequest request = new DefaultFullHttpRequest(
@@ -101,6 +102,9 @@ public class AbstractHandlerTest extends TestLogger {
 
 		handler.respondAsLeader(context, routerRequest, mockRestfulGateway);
 
+		// the (asynchronous) request processing is not yet complete so the files should still exist
+		Assert.assertTrue(Files.exists(file));
+		requestProcessingCompleteFuture.complete(null);
 		Assert.assertFalse(Files.exists(file));
 	}
 
@@ -156,14 +160,16 @@ public class AbstractHandlerTest extends TestLogger {
 	}
 
 	private static class TestHandler extends AbstractHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
+		private final CompletableFuture<Void> completionFuture;
 
-		protected TestHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
+		protected TestHandler(CompletableFuture<Void> completionFuture, @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
 			super(localAddressFuture, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestHeaders.INSTANCE);
+			this.completionFuture = completionFuture;
 		}
 
 		@Override
-		protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
-
+		protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+			return completionFuture;
 		}
 
 		private enum TestHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
@@ -188,6 +194,11 @@ public class AbstractHandlerTest extends TestLogger {
 			public String getTargetRestEndpointURL() {
 				return "/test";
 			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return true;
+			}
 		}
 	}
 }