You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/06/22 14:06:49 UTC

[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r197456037
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
    @@ -54,18 +69,89 @@ public JobSubmitHandler(
     
     	@Override
     	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
    -		JobGraph jobGraph;
    -		try {
    -			ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
    -			jobGraph = (JobGraph) objectIn.readObject();
    -		} catch (Exception e) {
    -			throw new RestHandlerException(
    -				"Failed to deserialize JobGraph.",
    -				HttpResponseStatus.BAD_REQUEST,
    -				e);
    +		Collection<Path> uploadedFiles = request.getUploadedFiles();
    +		Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
    +			path -> path.getFileName().toString(),
    +			entry -> entry
    +		));
    +
    +		JobSubmitRequestBody requestBody = request.getRequestBody();
    +
    +		Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
    +
    +		Collection<org.apache.flink.core.fs.Path> jarFiles = new ArrayList<>(requestBody.jarFileNames.size());
    +		for (String jarFileName : requestBody.jarFileNames) {
    +			Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile);
    +			jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString()));
    +		}
    +
    +		Collection<Tuple2<String, org.apache.flink.core.fs.Path>> artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
    +		for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) {
    +			Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
    +			artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString())));
     		}
     
    -		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +		Map<String, DistributedCache.DistributedCacheEntry> temporaryHack = artifacts.stream()
    +			.collect(Collectors.toMap(
    +				tuple -> tuple.f0,
    +				// the actual entry definition is mostly irrelevant as only the blobkey is accessed
    +				// blame whoever wrote the ClientUtils API
    +				tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
    +			));
    +
    +		// TODO: use executor
    +		CompletableFuture<JobGraph> jobGraphFuture = CompletableFuture.supplyAsync(() -> {
    +			JobGraph jobGraph;
    +			try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) {
    +				jobGraph = (JobGraph) objectIn.readObject();
    +			} catch (Exception e) {
    +				throw new CompletionException(new RestHandlerException(
    +					"Failed to deserialize JobGraph.",
    +					HttpResponseStatus.BAD_REQUEST,
    +					e));
    +			}
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
    +			try (BlobClient blobClient = new BlobClient(address, new Configuration())) {
    +				Collection<PermanentBlobKey> jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
    +				ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph);
    +
    +				Collection<Tuple2<String, PermanentBlobKey>> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient);
    +				ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys);
    +			} catch (IOException e) {
    +				throw new CompletionException(new RestHandlerException(
    +					"Could not upload job files.",
    +					HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +					e));
    --- End diff --
    
    they are cleaned up automatically by the `AbstractRestHandler`


---