You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/03 19:16:06 UTC

[3/3] flink git commit: [FLINK-9289][rest] Rework JobSubmitHandler to accept jar files

[FLINK-9289][rest] Rework JobSubmitHandler to accept jar files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/797709cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/797709cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/797709cb

Branch: refs/heads/release-1.5
Commit: 797709cb2466610b1d5b05c12e43d3f7d4f70183
Parents: 06b9bf1
Author: zentol <ch...@apache.org>
Authored: Mon Jun 11 11:45:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 3 18:32:08 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/rest_dispatcher.html   | 234 ++++++++-----------
 .../client/program/rest/RestClusterClient.java  | 120 +++++-----
 .../program/rest/RestClusterClientTest.java     |  23 --
 .../webmonitor/handlers/JarUploadHandler.java   |   5 +-
 .../handlers/JarUploadHandlerTest.java          |   2 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  11 +-
 .../flink/runtime/rest/AbstractHandler.java     | 128 +++++-----
 .../rest/handler/AbstractRestHandler.java       |   6 +-
 .../flink/runtime/rest/handler/FileUploads.java |   9 +-
 .../runtime/rest/handler/HandlerRequest.java    |   8 +-
 .../rest/handler/job/BlobServerPortHandler.java |  66 ------
 .../rest/handler/job/JobSubmitHandler.java      | 130 +++++++++--
 .../AbstractTaskManagerFileHandler.java         |   4 +-
 .../rest/messages/BlobServerPortHeaders.java    |  74 ------
 .../messages/BlobServerPortResponseBody.java    |  57 -----
 .../rest/messages/job/JobSubmitHeaders.java     |  11 +-
 .../rest/messages/job/JobSubmitRequestBody.java |  66 +++---
 .../flink/runtime/rest/util/RestConstants.java  |   6 +-
 .../flink/runtime/rest/AbstractHandlerTest.java |  19 +-
 .../runtime/rest/MultipartUploadResource.java   |   7 +-
 .../runtime/rest/RestServerEndpointITCase.java  |   3 +-
 .../runtime/rest/handler/FileUploadsTest.java   |   6 +-
 .../handler/job/BlobServerPortHandlerTest.java  | 101 --------
 .../rest/handler/job/JobSubmitHandlerTest.java  | 181 +++++++++++---
 .../messages/BlobServerPortResponseTest.java    |  35 ---
 .../rest/messages/JobSubmitRequestBodyTest.java |   6 +-
 .../webmonitor/TestingDispatcherGateway.java    | 203 ++++++++++++++++
 .../webmonitor/TestingRestfulGateway.java       |  30 +--
 28 files changed, 810 insertions(+), 741 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index c74da9c..034a3d3 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -1,50 +1,6 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/blobserver/port</strong></td>
-    </tr>
-    <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
-      <td class="text-left">Response code: <code>200 OK</code></td>
-    </tr>
-    <tr>
-      <td colspan="2">Returns the port of blob server which can be used to upload jars.</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#607508253">Request</button>
-        <div id="607508253" class="collapse">
-          <pre>
-            <code>
-{}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#1913718109">Response</button>
-        <div id="1913718109" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
-  "properties" : {
-    "port" : {
-      "type" : "integer"
-    }
-  }
-}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-  </tbody>
-</table>
-<table class="table table-bordered">
-  <tbody>
-    <tr>
       <td class="text-left" colspan="2"><strong>/cluster</strong></td>
     </tr>
     <tr>
@@ -226,19 +182,11 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#919877128">Request</button>
-        <div id="919877128" class="collapse">
+        <button data-toggle="collapse" data-target="#-1290030289">Request</button>
+        <div id="-1290030289" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:FileUpload",
-  "properties" : {
-    "path" : {
-      "type" : "string"
-    }
-  }
-}            </code>
+{}            </code>
           </pre>
          </div>
       </td>
@@ -607,7 +555,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
       <td class="text-left">Response code: <code>202 Accepted</code></td>
     </tr>
     <tr>
-      <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client.</td>
+      <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client. This call expects amultipart/form-data request that consists of file uploads for the serialized JobGraph, jars anddistributed cache artifacts and an attribute named "request"for the JSON payload.</td>
     </tr>
     <tr>
       <td colspan="2">
@@ -619,10 +567,13 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
   "type" : "object",
   "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
   "properties" : {
-    "serializedJobGraph" : {
+    "jobGraphFileName" : {
+      "type" : "string"
+    },
+    "jobJarFileNames" : {
       "type" : "array",
       "items" : {
-        "type" : "integer"
+        "type" : "string"
       }
     }
   }
@@ -2461,79 +2412,6 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
-    </tr>
-    <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
-      <td class="text-left">Response code: <code>200 OK</code></td>
-    </tr>
-    <tr>
-      <td colspan="2">Returns user-defined accumulators of a task, aggregated across all subtasks.</td>
-    </tr>
-    <tr>
-      <td colspan="2">Path parameters</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <ul>
-<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#485581006">Request</button>
-        <div id="485581006" class="collapse">
-          <pre>
-            <code>
-{}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" data-target="#-1070353054">Response</button>
-        <div id="-1070353054" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexAccumulatorsInfo",
-  "properties" : {
-    "id" : {
-      "type" : "string"
-    },
-    "user-accumulators" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
-        "properties" : {
-          "name" : {
-            "type" : "string"
-          },
-          "type" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
-}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-  </tbody>
-</table>
-<table class="table table-bordered">
-  <tbody>
-    <tr>
       <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td>
     </tr>
     <tr>
@@ -2675,6 +2553,100 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
 <table class="table table-bordered">
   <tbody>
     <tr>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/accumulators</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns all user-defined accumulators for all subtasks of a task.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#-886388859">Request</button>
+        <div id="-886388859" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#112317594">Response</button>
+        <div id="112317594" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo",
+  "properties" : {
+    "id" : {
+      "type" : "any"
+    },
+    "parallelism" : {
+      "type" : "integer"
+    },
+    "subtasks" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo:SubtaskAccumulatorsInfo",
+        "properties" : {
+          "subtask" : {
+            "type" : "integer"
+          },
+          "attempt" : {
+            "type" : "integer"
+          },
+          "host" : {
+            "type" : "string"
+          },
+          "user-accumulators" : {
+            "type" : "array",
+            "items" : {
+              "type" : "object",
+              "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
+              "properties" : {
+                "name" : {
+                  "type" : "string"
+                },
+                "type" : {
+                  "type" : "string"
+                },
+                "value" : {
+                  "type" : "string"
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
       <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 67233ea..adfd1df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -23,14 +23,14 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.NewClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -49,8 +50,6 @@ import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeader
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -87,10 +86,10 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq
 import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.ScalaUtils;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
@@ -102,20 +101,20 @@ import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import akka.actor.AddressFromURIString;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.ObjectOutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -313,41 +312,55 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 		// we have to enable queued scheduling because slot will be allocated lazily
 		jobGraph.setAllowQueuedScheduling(true);
 
-		log.info("Requesting blob server port.");
-		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
-
-		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
-			getDispatcherAddress(),
-			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
-				log.info("Uploading jar files.");
-				final int blobServerPort = response.port;
-				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
-				final List<PermanentBlobKey> keys;
-				try {
-					keys = BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-				} catch (IOException ioe) {
-					throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+		CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
+			try {
+				final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
+				try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+					objectOut.writeObject(jobGraph);
 				}
+				return jobGraphFile;
+			} catch (IOException e) {
+				throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
+			}
+		}, executorService);
 
-				for (PermanentBlobKey key : keys) {
-					jobGraph.addBlob(key);
-				}
+		CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
+			List<String> jarFileNames = new ArrayList<>(8);
+			Collection<FileUpload> filesToUpload = new ArrayList<>(8);
 
-				return jobGraph;
-			});
+			filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
 
-		CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose(
-			(JobGraph jobGraphToSubmit) -> {
-				log.info("Submitting job graph.");
+			for (Path jar : jobGraph.getUserJars()) {
+				jarFileNames.add(jar.getName());
+				filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+			}
 
-				try {
-					return sendRequest(
-						JobSubmitHeaders.getInstance(),
-						new JobSubmitRequestBody(jobGraph));
-				} catch (IOException ioe) {
-					throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe));
-				}
-			});
+			final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
+				jobGraphFile.getFileName().toString(),
+				jarFileNames
+			);
+
+			return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
+		});
+
+		final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
+			requestAndFileUploads -> sendRetriableRequest(
+				JobSubmitHeaders.getInstance(),
+				EmptyMessageParameters.getInstance(),
+				requestAndFileUploads.f0,
+				requestAndFileUploads.f1,
+				isConnectionProblemOrServiceUnavailable())
+		);
+
+		submissionFuture
+			.thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
+			.thenAccept(jobGraphFile -> {
+			try {
+				Files.delete(jobGraphFile);
+			} catch (IOException e) {
+				log.warn("Could not delete temporary file {}.", jobGraphFile, e);
+			}
+		});
 
 		return submissionFuture
 			.thenApply(
@@ -679,9 +692,14 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 
 	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
 			sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) {
+		return sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate);
+	}
+
+	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+	sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {
 		return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
 			try {
-				return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+				return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload);
 			} catch (IOException e) {
 				throw new CompletionException(e);
 			}
@@ -739,26 +757,4 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 				}
 			}, executorService);
 	}
-
-	private CompletableFuture<String> getDispatcherAddress() {
-		return FutureUtils.orTimeout(
-				dispatcherLeaderRetriever.getLeaderFuture(),
-				restClusterClientConfiguration.getAwaitLeaderTimeout(),
-				TimeUnit.MILLISECONDS)
-			.thenApplyAsync(leaderAddressSessionId -> {
-				final String address = leaderAddressSessionId.f0;
-				final Optional<String> host = ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host());
-
-				return host.orElseGet(() -> {
-					// if the dispatcher address does not contain a host part, then assume it's running
-					// on the same machine as the client
-					log.info("The dispatcher seems to run without remoting enabled. This indicates that we are " +
-						"in a test. This can only work if the RestClusterClient runs on the same machine. " +
-						"Assuming, therefore, 'localhost' as the host.");
-
-					return "localhost";
-				});
-			}, executorService);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f025d67..75f16c0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -53,8 +53,6 @@ import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -226,7 +224,6 @@ public class RestClusterClientTest extends TestLogger {
 
 	@Test
 	public void testJobSubmitCancelStop() throws Exception {
-		TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler();
 		TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
 		TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
 		TestJobExecutionResultHandler testJobExecutionResultHandler =
@@ -237,15 +234,12 @@ public class RestClusterClientTest extends TestLogger {
 					.build()));
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
-			portHandler,
 			submitHandler,
 			terminationHandler,
 			testJobExecutionResultHandler)) {
 
-			Assert.assertFalse(portHandler.portRetrieved);
 			Assert.assertFalse(submitHandler.jobSubmitted);
 			restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-			Assert.assertTrue(portHandler.portRetrieved);
 			Assert.assertTrue(submitHandler.jobSubmitted);
 
 			Assert.assertFalse(terminationHandler.jobCanceled);
@@ -264,11 +258,9 @@ public class RestClusterClientTest extends TestLogger {
 	@Test
 	public void testDetachedJobSubmission() throws Exception {
 
-		final TestBlobServerPortHandler testBlobServerPortHandler = new TestBlobServerPortHandler();
 		final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
-			testBlobServerPortHandler,
 			testJobSubmitHandler)) {
 
 			restClusterClient.setDetached(true);
@@ -282,20 +274,6 @@ public class RestClusterClientTest extends TestLogger {
 
 	}
 
-	private class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-		private volatile boolean portRetrieved = false;
-
-		private TestBlobServerPortHandler() {
-			super(BlobServerPortHeaders.getInstance());
-		}
-
-		@Override
-		protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-			portRetrieved = true;
-			return CompletableFuture.completedFuture(new BlobServerPortResponseBody(12000));
-		}
-	}
-
 	private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 		private volatile boolean jobSubmitted = false;
 
@@ -390,7 +368,6 @@ public class RestClusterClientTest extends TestLogger {
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
 			testJobExecutionResultHandler,
-			new TestBlobServerPortHandler(),
 			new TestJobSubmitHandler())) {
 
 			JobExecutionResult jobExecutionResult;

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index a1ef82b..83db224 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -32,6 +32,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -70,11 +71,11 @@ public class JarUploadHandler extends
 	protected CompletableFuture<JarUploadResponseBody> handleRequest(
 			@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
 			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
-		Collection<Path> uploadedFiles = request.getUploadedFiles();
+		Collection<File> uploadedFiles = request.getUploadedFiles();
 		if (uploadedFiles.size() != 1) {
 			throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
 		}
-		final Path fileUpload = uploadedFiles.iterator().next();
+		final Path fileUpload = uploadedFiles.iterator().next().toPath();
 		return CompletableFuture.supplyAsync(() -> {
 			if (!fileUpload.getFileName().toString().endsWith(".jar")) {
 				throw new CompletionException(new RestHandlerException(

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index 812d4c6..c9e25ed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -131,6 +131,6 @@ public class JarUploadHandlerTest extends TestLogger {
 			EmptyMessageParameters.getInstance(),
 			Collections.emptyMap(),
 			Collections.emptyMap(),
-			Collections.singleton(uploadedFile));
+			Collections.singleton(uploadedFile.toFile()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8072cf4..4279330 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -88,17 +87,12 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 
 		final Time timeout = restConfiguration.getTimeout();
 
-		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(
-			restAddressFuture,
-			leaderRetriever,
-			timeout,
-			responseHeaders);
-
 		JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
-			responseHeaders);
+			responseHeaders,
+			executor);
 
 		if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
 			try {
@@ -125,7 +119,6 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			log.info("Web-based job submission is not enabled.");
 		}
 
-		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
 		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
 
 		return handlers;

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 41d242b..e8d9384 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -49,7 +49,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -89,87 +91,73 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 
 		final HttpRequest httpRequest = routed.request();
 
+		FileUploads uploadedFiles = null;
 		try {
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
 				log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
-				HandlerUtils.sendErrorResponse(
-					ctx,
-					httpRequest,
-					new ErrorResponseBody("Bad request received."),
-					HttpResponseStatus.BAD_REQUEST,
-					responseHeaders);
-				return;
+				throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST);
 			}
 
 			final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
 
-			try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) {
+			uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx);
 
-				if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						new ErrorResponseBody("File uploads not allowed."),
-						HttpResponseStatus.BAD_REQUEST,
-						responseHeaders);
-					return;
-				}
+			if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
+				throw new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST);
+			}
 
-				R request;
-				if (msgContent.capacity() == 0) {
-					try {
-						request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
-					} catch (JsonParseException | JsonMappingException je) {
-						log.error("Request did not conform to expected format.", je);
-						HandlerUtils.sendErrorResponse(
-							ctx,
-							httpRequest,
-							new ErrorResponseBody("Bad request received."),
-							HttpResponseStatus.BAD_REQUEST,
-							responseHeaders);
-						return;
-					}
-				} else {
-					try {
-						ByteBufInputStream in = new ByteBufInputStream(msgContent);
-						request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
-					} catch (JsonParseException | JsonMappingException je) {
-						log.error("Failed to read request.", je);
-						HandlerUtils.sendErrorResponse(
-							ctx,
-							httpRequest,
-							new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
-							HttpResponseStatus.BAD_REQUEST,
-							responseHeaders);
-						return;
-					}
+			R request;
+			if (msgContent.capacity() == 0) {
+				try {
+					request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+				} catch (JsonParseException | JsonMappingException je) {
+					log.error("Request did not conform to expected format.", je);
+					throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
 				}
-
-				final HandlerRequest<R, M> handlerRequest;
-
+			} else {
 				try {
-					handlerRequest = new HandlerRequest<>(request, untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams(), uploadedFiles.getUploadedFiles());
-				} catch (HandlerRequestException hre) {
-					log.error("Could not create the handler request.", hre);
-
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
+					ByteBufInputStream in = new ByteBufInputStream(msgContent);
+					request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
+				} catch (JsonParseException | JsonMappingException je) {
+					log.error("Failed to read request.", je);
+					throw new RestHandlerException(
+						String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
 						HttpResponseStatus.BAD_REQUEST,
-						responseHeaders);
-					return;
+						je);
 				}
+			}
+
+			final HandlerRequest<R, M> handlerRequest;
 
-				respondToRequest(
-					ctx,
-					httpRequest,
-					handlerRequest,
-					gateway);
+			try {
+				handlerRequest = new HandlerRequest<>(request, untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams(), uploadedFiles.getUploadedFiles());
+			} catch (HandlerRequestException hre) {
+				log.error("Could not create the handler request.", hre);
+				throw new RestHandlerException(
+					String.format("Bad request, could not parse parameters: %s", hre.getMessage()),
+					HttpResponseStatus.BAD_REQUEST,
+					hre);
 			}
 
+			CompletableFuture<Void> requestProcessingFuture = respondToRequest(
+				ctx,
+				httpRequest,
+				handlerRequest,
+				gateway);
+
+			final FileUploads finalUploadedFiles = uploadedFiles;
+			requestProcessingFuture
+				.whenComplete((Void ignored, Throwable throwable) -> cleanupFileUploads(finalUploadedFiles));
+		} catch (RestHandlerException rhe) {
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				httpRequest,
+				new ErrorResponseBody(rhe.getMessage()),
+				rhe.getHttpResponseStatus(),
+				responseHeaders);
+			cleanupFileUploads(uploadedFiles);
 		} catch (Throwable e) {
 			log.error("Request processing failed.", e);
 			HandlerUtils.sendErrorResponse(
@@ -178,6 +166,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 				new ErrorResponseBody("Internal server error."),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				responseHeaders);
+			cleanupFileUploads(uploadedFiles);
+		}
+	}
+
+	private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
+		if (uploadedFiles != null) {
+			try {
+				uploadedFiles.close();
+			} catch (IOException e) {
+				log.warn("Could not cleanup uploaded files.", e);
+			}
 		}
 	}
 
@@ -188,9 +187,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 	 * @param httpRequest original http request
 	 * @param handlerRequest typed handler request
 	 * @param gateway leader gateway
+	 * @return Future which is completed once the request has been processed
 	 * @throws RestHandlerException if an exception occurred while responding
 	 */
-	protected abstract void respondToRequest(
+	protected abstract CompletableFuture<Void> respondToRequest(
 		ChannelHandlerContext ctx,
 		HttpRequest httpRequest,
 		HandlerRequest<R, M> handlerRequest,

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 448711b..e4cec08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -70,7 +70,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 	}
 
 	@Override
-	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
+	protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
 		CompletableFuture<P> response;
 
 		try {
@@ -79,7 +79,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 			response = FutureUtils.completedExceptionally(e);
 		}
 
-		response.whenComplete((P resp, Throwable throwable) -> {
+		return response.whenComplete((P resp, Throwable throwable) -> {
 			if (throwable != null) {
 
 				Throwable error = ExceptionUtils.stripCompletionException(throwable);
@@ -105,7 +105,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 					messageHeaders.getResponseStatusCode(),
 					responseHeaders);
 			}
-		});
+		}).thenApply(ignored -> null);
 	}
 
 	private void processRestHandlerException(ChannelHandlerContext ctx, HttpRequest httpRequest, RestHandlerException rhe) {

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
index 31ac47bb..b233cb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
@@ -24,6 +24,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
@@ -58,7 +59,7 @@ public final class FileUploads implements AutoCloseable {
 		this.uploadDirectory = uploadDirectory;
 	}
 
-	public Collection<Path> getUploadedFiles() throws IOException {
+	public Collection<File> getUploadedFiles() throws IOException {
 		if (uploadDirectory == null) {
 			return Collections.emptyList();
 		}
@@ -78,9 +79,9 @@ public final class FileUploads implements AutoCloseable {
 
 	private static final class FileAdderVisitor extends SimpleFileVisitor<Path> {
 
-		private final Collection<Path> files = new ArrayList<>(4);
+		private final Collection<File> files = new ArrayList<>(4);
 
-		Collection<Path> getContainedFiles() {
+		Collection<File> getContainedFiles() {
 			return files;
 		}
 
@@ -90,7 +91,7 @@ public final class FileUploads implements AutoCloseable {
 		@Override
 		public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
 			FileVisitResult result = super.visitFile(file, attrs);
-			files.add(file);
+			files.add(file.toFile());
 			return result;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 7e93556..990dae5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
-import java.nio.file.Path;
+import java.io.File;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,7 +43,7 @@ import java.util.StringJoiner;
 public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
 
 	private final R requestBody;
-	private final Collection<Path> uploadedFiles;
+	private final Collection<File> uploadedFiles;
 	private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
 	private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
@@ -55,7 +55,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 		this(requestBody, messageParameters, receivedPathParameters, receivedQueryParameters, Collections.emptyList());
 	}
 
-	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<Path> uploadedFiles) throws HandlerRequestException {
+	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<File> uploadedFiles) throws HandlerRequestException {
 		this.requestBody = Preconditions.checkNotNull(requestBody);
 		this.uploadedFiles = Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles));
 		Preconditions.checkNotNull(messageParameters);
@@ -141,7 +141,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 	}
 
 	@Nonnull
-	public Collection<Path> getUploadedFiles() {
+	public Collection<File> getUploadedFiles() {
 		return uploadedFiles;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
deleted file mode 100644
index 4b5fa89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-
-/**
- * This handler can be used to retrieve the port that the blob server runs on.
- */
-public final class BlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
-	public BlobServerPortHandler(
-			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
-			Time timeout,
-			Map<String, String> headers) {
-		super(localRestAddress, leaderRetriever, timeout, headers, BlobServerPortHeaders.getInstance());
-	}
-
-	@Override
-	protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-		return gateway
-			.getBlobServerPort(timeout)
-			.thenApply(BlobServerPortResponseBody::new)
-			.exceptionally(error -> {
-				throw new CompletionException(new RestHandlerException(
-					"Failed to retrieve blob server port.",
-					HttpResponseStatus.INTERNAL_SERVER_ERROR,
-					ExceptionUtils.stripCompletionException(error)));
-			});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index af04629..dfa9591 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -19,8 +19,13 @@
 package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -34,42 +39,139 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 
+	private static final String FILE_TYPE_JOB_GRAPH = "JobGraph";
+	private static final String FILE_TYPE_JAR = "Jar";
+
+	private final Executor executor;
+
 	public JobSubmitHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
 			Time timeout,
-			Map<String, String> headers) {
+			Map<String, String> headers,
+			Executor executor) {
 		super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance());
+		this.executor = executor;
 	}
 
 	@Override
 	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-		JobGraph jobGraph;
-		try {
-			ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-			jobGraph = (JobGraph) objectIn.readObject();
-		} catch (Exception e) {
+		final Collection<File> uploadedFiles = request.getUploadedFiles();
+		final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
+			File::getName,
+			Path::fromLocalFile
+		));
+
+		if (uploadedFiles.size() != nameToFile.size()) {
 			throw new RestHandlerException(
-				"Failed to deserialize JobGraph.",
-				HttpResponseStatus.BAD_REQUEST,
-				e);
+				String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
+					uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
+					nameToFile.size(),
+					uploadedFiles.size()),
+				HttpResponseStatus.BAD_REQUEST
+			);
 		}
 
-		return gateway.submitJob(jobGraph, timeout)
-			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
+		final JobSubmitRequestBody requestBody = request.getRequestBody();
+
+		CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
+
+		Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
+
+		CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles);
+
+		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
+
+		return jobSubmissionFuture.thenCombine(jobGraphFuture,
+			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
 			.exceptionally(exception -> {
-				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
-			});
+			throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
+		});
+	}
+
+	private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody requestBody, Map<String, Path> nameToFile) throws MissingFileException {
+		final Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, nameToFile);
+
+		return CompletableFuture.supplyAsync(() -> {
+			JobGraph jobGraph;
+			try (ObjectInputStream objectIn = new ObjectInputStream(jobGraphFile.getFileSystem().open(jobGraphFile))) {
+				jobGraph = (JobGraph) objectIn.readObject();
+			} catch (Exception e) {
+				throw new CompletionException(new RestHandlerException(
+					"Failed to deserialize JobGraph.",
+					HttpResponseStatus.BAD_REQUEST,
+					e));
+			}
+			return jobGraph;
+		}, executor);
+	}
+
+	private static Collection<Path> getJarFilesToUpload(Collection<String> jarFileNames, Map<String, Path> nameToFileMap) throws MissingFileException {
+		Collection<Path> jarFiles = new ArrayList<>(jarFileNames.size());
+		for (String jarFileName : jarFileNames) {
+			Path jarFile = getPathAndAssertUpload(jarFileName, FILE_TYPE_JAR, nameToFileMap);
+			jarFiles.add(new Path(jarFile.toString()));
+		}
+		return jarFiles;
+	}
+
+	private CompletableFuture<JobGraph> uploadJobGraphFiles(
+			DispatcherGateway gateway,
+			CompletableFuture<JobGraph> jobGraphFuture,
+			Collection<Path> jarFiles) {
+		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+		return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
+			final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
+			if (!jarFiles.isEmpty()) {
+				try {
+					final List<PermanentBlobKey> permanentBlobKeys = BlobClient.uploadJarFiles(address, new Configuration(), jobGraph.getJobID(), new ArrayList<>(jarFiles));
+					for (PermanentBlobKey blobKey : permanentBlobKeys) {
+						jobGraph.addBlob(blobKey);
+					}
+				} catch (IOException e) {
+					throw new CompletionException(new RestHandlerException(
+						"Could not upload job files.",
+						HttpResponseStatus.INTERNAL_SERVER_ERROR,
+						e));
+				}
+			}
+			return jobGraph;
+		});
+	}
+
+	private static Path getPathAndAssertUpload(String fileName, String type, Map<String, Path> uploadedFiles) throws MissingFileException {
+		final Path file = uploadedFiles.get(fileName);
+		if (file == null) {
+			throw new MissingFileException(type, fileName);
+		}
+		return file;
+	}
+
+	private static final class MissingFileException extends RestHandlerException {
+
+		private static final long serialVersionUID = -7954810495610194965L;
+
+		MissingFileException(String type, String fileName) {
+			super(type + " file " + fileName + " could not be found on the server.", HttpResponseStatus.BAD_REQUEST);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 5b5d97d..265813f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -117,7 +117,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 	}
 
 	@Override
-	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+	protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
 		final ResourceID taskManagerId = handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
 
 		final CompletableFuture<TransientBlobKey> blobKeyFuture;
@@ -152,7 +152,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
 			},
 			ctx.executor());
 
-		resultFuture.whenComplete(
+		return resultFuture.whenComplete(
 			(Void ignored, Throwable throwable) -> {
 				if (throwable != null) {
 					log.debug("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable);

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
deleted file mode 100644
index a845de3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * These headers define the protocol for querying the port of the blob server.
- */
-public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
-	private static final String URL = "/blobserver/port";
-	private static final BlobServerPortHeaders INSTANCE = new BlobServerPortHeaders();
-
-	private BlobServerPortHeaders() {
-	}
-
-	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
-	}
-
-	@Override
-	public HttpMethodWrapper getHttpMethod() {
-		return HttpMethodWrapper.GET;
-	}
-
-	@Override
-	public String getTargetRestEndpointURL() {
-		return URL;
-	}
-
-	@Override
-	public Class<BlobServerPortResponseBody> getResponseClass() {
-		return BlobServerPortResponseBody.class;
-	}
-
-	@Override
-	public HttpResponseStatus getResponseStatusCode() {
-		return HttpResponseStatus.OK;
-	}
-
-	@Override
-	public EmptyMessageParameters getUnresolvedMessageParameters() {
-		return EmptyMessageParameters.getInstance();
-	}
-
-	public static BlobServerPortHeaders getInstance() {
-		return INSTANCE;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Returns the port of blob server which can be used to upload jars.";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
deleted file mode 100644
index 895ecf3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Response containing the blob server port.
- */
-public final class BlobServerPortResponseBody implements ResponseBody {
-
-	static final String FIELD_NAME_PORT = "port";
-
-	/**
-	 * The port of the blob server.
-	 */
-	@JsonProperty(FIELD_NAME_PORT)
-	public final int port;
-
-	@JsonCreator
-	public BlobServerPortResponseBody(
-		@JsonProperty(FIELD_NAME_PORT) int port) {
-
-		this.port = port;
-	}
-
-	@Override
-	public int hashCode() {
-		return 67 * port;
-	}
-
-	@Override
-	public boolean equals(Object object) {
-		if (object instanceof BlobServerPortResponseBody) {
-			BlobServerPortResponseBody other = (BlobServerPortResponseBody) object;
-			return this.port == other.port;
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
index 88f53f2..42f64b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
+import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -71,6 +72,14 @@ public class JobSubmitHeaders implements MessageHeaders<JobSubmitRequestBody, Jo
 
 	@Override
 	public String getDescription() {
-		return "Submits a job. This call is primarily intended to be used by the Flink client.";
+		return "Submits a job. This call is primarily intended to be used by the Flink client. This call expects a" +
+			"multipart/form-data request that consists of file uploads for the serialized JobGraph, jars and" +
+			"distributed cache artifacts and an attribute named \"" + FileUploadHandler.HTTP_ATTRIBUTE_REQUEST + "\"for " +
+			"the JSON payload.";
+	}
+
+	@Override
+	public boolean acceptsFileUploads() {
+		return true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
index 3f550f0..8829bc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
@@ -18,64 +18,62 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * <p>We currently require the job-jars to be uploaded through the blob-server.
+ * <p>This request only contains the names of files that must be present on the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-	private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph";
+	private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+	private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
 
-	/**
-	 * The serialized job graph.
-	 */
-	@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-	public final byte[] serializedJobGraph;
+	@JsonProperty(FIELD_NAME_JOB_GRAPH)
+	public final String jobGraphFileName;
 
-	public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-		this(serializeJobGraph(jobGraph));
-	}
+	@JsonProperty(FIELD_NAME_JOB_JARS)
+	public final Collection<String> jarFileNames;
 
 	@JsonCreator
 	public JobSubmitRequestBody(
-			@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) {
-		this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph);
+			@JsonProperty(FIELD_NAME_JOB_GRAPH) String jobGraphFileName,
+			@JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> jarFileNames) {
+		this.jobGraphFileName = jobGraphFileName;
+		this.jarFileNames = jarFileNames;
 	}
 
 	@Override
-	public int hashCode() {
-		return 71 * Arrays.hashCode(this.serializedJobGraph);
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+		return Objects.equals(jobGraphFileName, that.jobGraphFileName) &&
+			Objects.equals(jarFileNames, that.jarFileNames);
 	}
 
 	@Override
-	public boolean equals(Object object) {
-		if (object instanceof JobSubmitRequestBody) {
-			JobSubmitRequestBody other = (JobSubmitRequestBody) object;
-			return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph);
-		}
-		return false;
+	public int hashCode() {
+		return Objects.hash(jobGraphFileName, jarFileNames);
 	}
 
-	private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
-		try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) {
-			ObjectOutputStream out = new ObjectOutputStream(baos);
-
-			out.writeObject(jobGraph);
-
-			return baos.toByteArray();
-		}
+	@Override
+	public String toString() {
+		return "JobSubmitRequestBody{" +
+			"jobGraphFileName='" + jobGraphFileName + '\'' +
+			", jarFileNames=" + jarFileNames +
+			'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
index c5135bf..a538e17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
@@ -23,6 +23,10 @@ import org.apache.flink.configuration.ConfigConstants;
 /**
  * This class contains constants to be used by rest components.
  */
-public class RestConstants {
+public enum RestConstants {
+	;
+
 	public static final String REST_CONTENT_TYPE = "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name();
+	public static final String CONTENT_TYPE_JAR = "application/java-archive";
+	public static final String CONTENT_TYPE_BINARY = "application/octet-stream";
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
index ffcc72a..e759d30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
@@ -80,7 +80,8 @@ public class AbstractHandlerTest extends TestLogger {
 		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
 			CompletableFuture.completedFuture(mockRestfulGateway);
 
-		TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+		CompletableFuture<Void> requestProcessingCompleteFuture = new CompletableFuture<>();
+		TestHandler handler = new TestHandler(requestProcessingCompleteFuture, CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
 
 		HttpRequest request = new DefaultFullHttpRequest(
 			HttpVersion.HTTP_1_1,
@@ -99,6 +100,9 @@ public class AbstractHandlerTest extends TestLogger {
 
 		handler.respondAsLeader(context, routerRequest, mockRestfulGateway);
 
+		// the (asynchronous) request processing is not yet complete so the files should still exist
+		Assert.assertTrue(Files.exists(file));
+		requestProcessingCompleteFuture.complete(null);
 		Assert.assertFalse(Files.exists(file));
 	}
 
@@ -154,14 +158,16 @@ public class AbstractHandlerTest extends TestLogger {
 	}
 
 	private static class TestHandler extends AbstractHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
+		private final CompletableFuture<Void> completionFuture;
 
-		protected TestHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
+		protected TestHandler(CompletableFuture<Void> completionFuture, @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
 			super(localAddressFuture, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestHeaders.INSTANCE);
+			this.completionFuture = completionFuture;
 		}
 
 		@Override
-		protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
-
+		protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+			return completionFuture;
 		}
 
 		private enum TestHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
@@ -186,6 +192,11 @@ public class AbstractHandlerTest extends TestLogger {
 			public String getTargetRestEndpointURL() {
 				return "/test";
 			}
+
+			@Override
+			public boolean acceptsFileUploads() {
+				return true;
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 1311b80..0153d5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -66,6 +66,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertArrayEquals;
@@ -203,7 +204,7 @@ public class MultipartUploadResource extends ExternalResource {
 
 		@Override
 		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
-			MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles());
+			MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList()));
 			this.lastReceivedRequest = request.getRequestBody();
 			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
 		}
@@ -268,7 +269,7 @@ public class MultipartUploadResource extends ExternalResource {
 
 		@Override
 		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
-			Collection<Path> uploadedFiles = request.getUploadedFiles();
+			Collection<Path> uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
 			if (!uploadedFiles.isEmpty()) {
 				throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
 			}
@@ -313,7 +314,7 @@ public class MultipartUploadResource extends ExternalResource {
 
 		@Override
 		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
-			verifyFileUpload(expectedFiles, request.getUploadedFiles());
+			verifyFileUpload(expectedFiles, request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList()));
 			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index b9413ba..93dbb5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -85,6 +85,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 import static org.hamcrest.Matchers.containsString;
@@ -661,7 +662,7 @@ public class RestServerEndpointITCase extends TestLogger {
 
 		@Override
 		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException {
-			Collection<Path> uploadedFiles = request.getUploadedFiles();
+			Collection<Path> uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
 			if (uploadedFiles.size() != 1) {
 				throw new RestHandlerException("Expected 1 file, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
index fb7faa3..2b14641 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
@@ -25,11 +25,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.stream.Collectors;
 
 /**
  * Tests for {@link FileUploads}.
@@ -64,7 +66,7 @@ public class FileUploadsTest extends TestLogger {
 		Files.createFile(tmp.resolve(subFile));
 
 		try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
-			Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+			Collection<Path> detectedFiles = fileUploads.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
 
 			Assert.assertEquals(2, detectedFiles.size());
 			Assert.assertTrue(detectedFiles.contains(tmp.resolve(rootFile)));
@@ -80,7 +82,7 @@ public class FileUploadsTest extends TestLogger {
 		Files.createDirectory(tmp.resolve(rootDir));
 
 		try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
-			Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+			Collection<File> detectedFiles = fileUploads.getUploadedFiles();
 			Assert.assertEquals(0, detectedFiles.size());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/797709cb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
deleted file mode 100644
index 15c2eb4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the {@link BlobServerPortHandler}.
- */
-public class BlobServerPortHandlerTest extends TestLogger {
-	private static final int PORT = 64;
-
-	@Test
-	public void testPortRetrieval() throws Exception {
-		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-		when(mockGateway.getBlobServerPort(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(PORT));
-		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
-
-		BlobServerPortHandler handler = new BlobServerPortHandler(
-			CompletableFuture.completedFuture("http://localhost:1234"),
-			mockGatewayRetriever,
-			RpcUtils.INF_TIMEOUT,
-			Collections.emptyMap());
-
-		BlobServerPortResponseBody portResponse = handler
-			.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
-			.get();
-
-		Assert.assertEquals(PORT, portResponse.port);
-	}
-
-	@Test
-	public void testPortRetrievalFailureHandling() throws Exception {
-		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
-		when(mockGateway.getBlobServerPort(any(Time.class)))
-			.thenReturn(FutureUtils.completedExceptionally(new TestException()));
-		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
-
-		BlobServerPortHandler handler = new BlobServerPortHandler(
-			CompletableFuture.completedFuture("http://localhost:1234"),
-			mockGatewayRetriever,
-			RpcUtils.INF_TIMEOUT,
-			Collections.emptyMap());
-
-		try {
-			handler
-				.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
-				.get();
-			Assert.fail();
-		} catch (ExecutionException ee) {
-			RestHandlerException rhe = (RestHandlerException) ee.getCause();
-
-			Assert.assertEquals(TestException.class, rhe.getCause().getClass());
-			Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, rhe.getHttpResponseStatus());
-		}
-	}
-
-	private static class TestException extends Exception {
-		private static final long serialVersionUID = -7064446788277853899L;
-	}
-}