You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:28 UTC
[10/18] flink git commit: [FLINK-8966][tests] Upload user-jars in
MiniCluster
[FLINK-8966][tests] Upload user-jars in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3947a395
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3947a395
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3947a395
Branch: refs/heads/master
Commit: 3947a395ecfe37de97be346416493d933cd3fe5a
Parents: 2426f78
Author: zentol <ch...@apache.org>
Authored: Wed Mar 14 12:49:11 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:30 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 38 +++++++++++++++++++-
1 file changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3947a395/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e826eb..64d46c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,9 +25,12 @@ import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -86,8 +89,10 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -632,7 +637,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
- final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout);
+ final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+
+ final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
+ (Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
@@ -661,6 +669,34 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
+ private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) {
+ List<Path> userJars = job.getUserJars();
+ if (!userJars.isEmpty()) {
+ CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
+ return jarUploadFuture.thenAccept(blobKeys -> {
+ for (PermanentBlobKey blobKey : blobKeys) {
+ job.addBlob(blobKey);
+ }
+ });
+ } else {
+ LOG.debug("No jars to upload for job {}.", job.getJobID());
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) {
+ return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
+ .thenApply(blobServerPort -> {
+ InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
+
+ try {
+ return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
+ } catch (IOException ioe) {
+ throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ }
+ });
+ }
+
// ------------------------------------------------------------------------
// factories - can be overridden by subclasses to alter behavior
// ------------------------------------------------------------------------