You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 13:31:10 UTC
[12/19] flink git commit: [FLINK-8966][tests] Upload user-jars in
MiniCluster
[FLINK-8966][tests] Upload user-jars in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7131d610
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7131d610
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7131d610
Branch: refs/heads/release-1.5
Commit: 7131d610928b0c9329254fbaa30b1252035c69b1
Parents: 491aceb
Author: zentol <ch...@apache.org>
Authored: Wed Mar 14 12:49:11 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:48 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 38 +++++++++++++++++++-
1 file changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7131d610/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e826eb..64d46c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,9 +25,12 @@ import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -86,8 +89,10 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -632,7 +637,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
- final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout);
+ final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+
+ final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
+ (Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
@@ -661,6 +669,34 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
+ private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) {
+ List<Path> userJars = job.getUserJars();
+ if (!userJars.isEmpty()) {
+ CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
+ return jarUploadFuture.thenAccept(blobKeys -> {
+ for (PermanentBlobKey blobKey : blobKeys) {
+ job.addBlob(blobKey);
+ }
+ });
+ } else {
+ LOG.debug("No jars to upload for job {}.", job.getJobID());
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) {
+ return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
+ .thenApply(blobServerPort -> {
+ InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
+
+ try {
+ return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+ });
+ }
+
// ------------------------------------------------------------------------
// factories - can be overridden by subclasses to alter behavior
// ------------------------------------------------------------------------