You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/06/22 14:06:49 UTC
[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r197456037
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- JobGraph jobGraph;
- try {
- ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
- jobGraph = (JobGraph) objectIn.readObject();
- } catch (Exception e) {
- throw new RestHandlerException(
- "Failed to deserialize JobGraph.",
- HttpResponseStatus.BAD_REQUEST,
- e);
+ Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
+ path -> path.getFileName().toString(),
+ entry -> entry
+ ));
+
+ JobSubmitRequestBody requestBody = request.getRequestBody();
+
+ Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+ Collection<org.apache.flink.core.fs.Path> jarFiles = new ArrayList<>(requestBody.jarFileNames.size());
+ for (String jarFileName : requestBody.jarFileNames) {
+ Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile);
+ jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString()));
+ }
+
+ Collection<Tuple2<String, org.apache.flink.core.fs.Path>> artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
+ for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) {
+ Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
+ artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString())));
}
- return gateway.submitJob(jobGraph, timeout)
- .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
+ Map<String, DistributedCache.DistributedCacheEntry> temporaryHack = artifacts.stream()
+ .collect(Collectors.toMap(
+ tuple -> tuple.f0,
+ // the actual entry definition is mostly irrelevant as only the blobkey is accessed
+ // blame whoever wrote the ClientUtils API
+ tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
+ ));
+
+ // TODO: use executor
+ CompletableFuture<JobGraph> jobGraphFuture = CompletableFuture.supplyAsync(() -> {
+ JobGraph jobGraph;
+ try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) {
+ jobGraph = (JobGraph) objectIn.readObject();
+ } catch (Exception e) {
+ throw new CompletionException(new RestHandlerException(
+ "Failed to deserialize JobGraph.",
+ HttpResponseStatus.BAD_REQUEST,
+ e));
+ }
+ return jobGraph;
+ });
+
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ CompletableFuture<JobGraph> finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
+ try (BlobClient blobClient = new BlobClient(address, new Configuration())) {
+ Collection<PermanentBlobKey> jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
+ ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph);
+
+ Collection<Tuple2<String, PermanentBlobKey>> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient);
+ ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys);
+ } catch (IOException e) {
+ throw new CompletionException(new RestHandlerException(
+ "Could not upload job files.",
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ e));
--- End diff --
they are cleaned up automatically by the `AbstractRestHandler`
---