You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 13:31:10 UTC

[12/19] flink git commit: [FLINK-8966][tests] Upload user-jars in MiniCluster

[FLINK-8966][tests] Upload user-jars in MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7131d610
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7131d610
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7131d610

Branch: refs/heads/release-1.5
Commit: 7131d610928b0c9329254fbaa30b1252035c69b1
Parents: 491aceb
Author: zentol <ch...@apache.org>
Authored: Wed Mar 14 12:49:11 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 14:26:48 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  | 38 +++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7131d610/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e826eb..64d46c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,9 +25,12 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -86,8 +89,10 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -632,7 +637,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		// from the ResourceManager
 		jobGraph.setAllowQueuedScheduling(true);
 
-		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout);
+		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+
+		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
+			(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
 
 		return acknowledgeCompletableFuture.thenApply(
 			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
@@ -661,6 +669,34 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		}
 	}
 
+	private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) {
+		List<Path> userJars = job.getUserJars();
+		if (!userJars.isEmpty()) {
+			CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
+			return jarUploadFuture.thenAccept(blobKeys -> {
+					for (PermanentBlobKey blobKey : blobKeys) {
+						job.addBlob(blobKey);
+					}
+				});
+		} else {
+			LOG.debug("No jars to upload for job {}.", job.getJobID());
+			return CompletableFuture.completedFuture(null);
+		}
+	}
+
+	private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) {
+		return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
+			.thenApply(blobServerPort -> {
+				InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
+
+				try {
+					return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
+				} catch (IOException ioe) {
+					throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+				}
+			});
+	}
+
 	// ------------------------------------------------------------------------
 	//  factories - can be overridden by subclasses to alter behavior
 	// ------------------------------------------------------------------------