You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/28 10:21:23 UTC

flink git commit: [FLINK-9624][runtime] Move jar/artifact upload out of jobgraph

Repository: flink
Updated Branches:
  refs/heads/master 81839d7e3 -> dd4c8469b


[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph

This closes #6199.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd4c8469
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd4c8469
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd4c8469

Branch: refs/heads/master
Commit: dd4c8469b11184b633d2b9514b9910622734270f
Parents: 81839d7
Author: zentol <ch...@apache.org>
Authored: Wed Jun 13 18:21:21 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 28 12:21:06 2018 +0200

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  |  16 +-
 .../webmonitor/handlers/JarRunHandler.java      |  14 +-
 .../flink/runtime/client/ClientUtils.java       | 127 +++++++++++++++
 .../apache/flink/runtime/client/JobClient.java  |  16 +-
 .../client/JobSubmissionClientActor.java        |  23 +--
 .../apache/flink/runtime/jobgraph/JobGraph.java |  91 ++---------
 .../flink/runtime/minicluster/MiniCluster.java  |  53 ++-----
 .../flink/runtime/client/ClientUtilsTest.java   | 154 +++++++++++++++++++
 .../flink/runtime/jobgraph/JobGraphTest.java    |  59 ++++---
 9 files changed, 362 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8eb4ec0..85699d7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -30,7 +30,7 @@ import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -323,17 +323,11 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
 				final int blobServerPort = response.port;
 				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
-				final List<PermanentBlobKey> keys;
-				try {
-					log.info("Uploading jar files.");
-					keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-					jobGraph.uploadUserArtifacts(address, flinkConfig);
-				} catch (IOException ioe) {
-					throw new CompletionException(new FlinkException("Could not upload job files.", ioe));
-				}
 
-				for (PermanentBlobKey key : keys) {
-					jobGraph.addUserJarBlobKey(key);
+				try {
+					ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig));
+				} catch (Exception e) {
+					throw new CompletionException(e);
 				}
 
 				return jobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 0605bf1..10387c8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -46,7 +46,6 @@ import akka.actor.AddressFromURIString;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -114,15 +113,10 @@ public class JarRunHandler extends
 
 		CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
 			final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
-			final List<PermanentBlobKey> keys;
 			try {
-				keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
-			} catch (IOException ioe) {
-				throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
-			}
-
-			for (PermanentBlobKey key : keys) {
-				jobGraph.addUserJarBlobKey(key);
+				ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration));
+			} catch (FlinkException e) {
+				throw new CompletionException(e);
 			}
 
 			return jobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
new file mode 100644
index 0000000..fc6a621
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+	;
+
+	/**
+	 * Uploads all files required for the execution of the given {@link JobGraph} using the {@link BlobClient} from
+	 * the given {@link Supplier}.
+	 *
+	 * @param jobGraph jobgraph requiring files
+	 * @param clientSupplier supplier of blob client to upload files with
+	 * @throws IOException if the upload fails
+	 */
+	public static void uploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
+		List<Path> userJars = jobGraph.getUserJars();
+		Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts();
+		if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
+			try (BlobClient client = clientSupplier.get()) {
+				uploadAndSetUserJars(jobGraph, client);
+				uploadAndSetUserArtifacts(jobGraph, client);
+			} catch (IOException ioe) {
+				throw new FlinkException("Could not upload job files.", ioe);
+			}
+		}
+	}
+
+	/**
+	 * Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient},
+	 * and sets the appropriate blobkeys.
+	 *
+	 * @param jobGraph   jobgraph requiring user jars
+	 * @param blobClient client to upload jars with
+	 * @throws IOException if the upload fails
+	 */
+	private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException {
+		Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+		setUserJarBlobKeys(blobKeys, jobGraph);
+	}
+
+	private static Collection<PermanentBlobKey> uploadUserJars(JobID jobId, Collection<Path> userJars, BlobClient blobClient) throws IOException {
+		Collection<PermanentBlobKey> blobKeys = new ArrayList<>(userJars.size());
+		for (Path jar : userJars) {
+			final PermanentBlobKey blobKey = blobClient.uploadFile(jobId, jar);
+			blobKeys.add(blobKey);
+		}
+		return blobKeys;
+	}
+
+	private static void setUserJarBlobKeys(Collection<PermanentBlobKey> blobKeys, JobGraph jobGraph) {
+		blobKeys.forEach(jobGraph::addUserJarBlobKey);
+	}
+
+	/**
+	 * Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient},
+	 * and sets the appropriate blobkeys.
+	 *
+	 * @param jobGraph jobgraph requiring user artifacts
+	 * @param blobClient client to upload artifacts with
+	 * @throws IOException if the upload fails
+	 */
+	private static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException {
+		Collection<Tuple2<String, Path>> artifactPaths = jobGraph.getUserArtifacts().entrySet().stream()
+			.map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath)))
+			.collect(Collectors.toList());
+
+		Collection<Tuple2<String, PermanentBlobKey>> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient);
+		setUserArtifactBlobKeys(jobGraph, blobKeys);
+	}
+
+	private static Collection<Tuple2<String, PermanentBlobKey>> uploadUserArtifacts(JobID jobID, Collection<Tuple2<String, Path>> userArtifacts, BlobClient blobClient) throws IOException {
+		Collection<Tuple2<String, PermanentBlobKey>> blobKeys = new ArrayList<>(userArtifacts.size());
+		for (Tuple2<String, Path> userArtifact : userArtifacts) {
+			// only upload local files
+			if (!userArtifact.f1.getFileSystem().isDistributedFS()) {
+				final PermanentBlobKey blobKey = blobClient.uploadFile(jobID, userArtifact.f1);
+				blobKeys.add(Tuple2.of(userArtifact.f0, blobKey));
+			}
+		}
+		return blobKeys;
+	}
+
+	private static void setUserArtifactBlobKeys(JobGraph jobGraph, Collection<Tuple2<String, PermanentBlobKey>> blobKeys) throws IOException {
+		for (Tuple2<String, PermanentBlobKey> blobKey : blobKeys) {
+			jobGraph.setUserArtifactBlobKey(blobKey.f0, blobKey.f1);
+		}
+		jobGraph.writeUserArtifactEntriesToConfiguration();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 76d9bd6..27da3b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorRef;
@@ -422,18 +424,10 @@ public class JobClient {
 		}
 
 		try {
-			jobGraph.uploadUserJars(blobServerAddress, config);
-		}
-		catch (IOException e) {
-			throw new JobSubmissionException(jobGraph.getJobID(),
-				"Could not upload the program's JAR files to the JobManager.", e);
-		}
-
-		try {
-			jobGraph.uploadUserArtifacts(blobServerAddress, config);
-		} catch (IOException e) {
+			ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config));
+		} catch (FlinkException e) {
 			throw new JobSubmissionException(jobGraph.getJobID(),
-					"Could not upload custom user artifacts to the job manager.", e);
+					"Could not upload job files.", e);
 		}
 
 		CompletableFuture<Acknowledge> submissionFuture = jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 9b95633..2783b09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -31,13 +32,13 @@ import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -154,23 +155,9 @@ public class JobSubmissionClientActor extends JobClientActor {
 		final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync(
 			(InetSocketAddress blobServerAddress) -> {
 				try {
-					jobGraph.uploadUserJars(blobServerAddress, clientConfig);
-				} catch (IOException e) {
-					throw new CompletionException(
-						new JobSubmissionException(
-							jobGraph.getJobID(),
-							"Could not upload the jar files to the job manager.",
-							e));
-				}
-
-				try {
-					jobGraph.uploadUserArtifacts(blobServerAddress, clientConfig);
-				} catch (IOException e) {
-					throw new CompletionException(
-						new JobSubmissionException(
-							jobGraph.getJobID(),
-							"Could not upload custom user artifacts to the job manager.",
-							e));
+					ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig));
+				} catch (FlinkException e) {
+					throw new CompletionException(e);
 				}
 			},
 			getContext().dispatcher());

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 7231383..b3e03de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.InstantiationUtil;
@@ -32,12 +31,10 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -552,68 +549,25 @@ public class JobGraph implements Serializable {
 		return this.userJarBlobKeys;
 	}
 
-	/**
-	 * Uploads the previously added user JAR files to the job manager through
-	 * the job manager's BLOB server. The BLOB servers' address is given as a
-	 * parameter. This function issues a blocking call.
-	 *
-	 * @param blobServerAddress of the blob server to upload the jars to
-	 * @param blobClientConfig the blob client configuration
-	 * @throws IOException Thrown, if the file upload to the JobManager failed.
-	 */
-	public void uploadUserJars(
-			InetSocketAddress blobServerAddress,
-			Configuration blobClientConfig) throws IOException {
-		if (!userJars.isEmpty()) {
-			List<PermanentBlobKey> blobKeys = BlobClient.uploadFiles(
-				blobServerAddress, blobClientConfig, jobID, userJars);
-
-			for (PermanentBlobKey blobKey : blobKeys) {
-				if (!userJarBlobKeys.contains(blobKey)) {
-					userJarBlobKeys.add(blobKey);
-				}
-			}
-		}
-	}
-
 	@Override
 	public String toString() {
 		return "JobGraph(jobId: " + jobID + ")";
 	}
 
-	/**
-	 * Configures JobGraph with user specified artifacts. If the files are in local system it uploads them
-	 * to the BLOB server, otherwise it just puts metadata for future remote access from within task executor.
-	 *
-	 * @param blobServerAddress of the blob server to upload the files to
-	 * @param blobClientConfig the blob client configuration
-	 * @throws IOException Thrown, if the file upload to the Blob server failed.
-	 */
-	public void uploadUserArtifacts(
-			InetSocketAddress blobServerAddress,
-			Configuration blobClientConfig) throws IOException {
+	public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) throws IOException {
+		byte[] serializedBlobKey;
+		serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
 
-		Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer = new HashSet<>();
-		Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> distributeViaDFS = new HashSet<>();
+		userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry(
+			originalEntry.filePath,
+			originalEntry.isExecutable,
+			serializedBlobKey,
+			originalEntry.isZipped
+		));
+	}
 
+	public void writeUserArtifactEntriesToConfiguration() {
 		for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
-			Path filePath = new Path(userArtifact.getValue().filePath);
-
-			try {
-				if (filePath.getFileSystem().isDistributedFS()) {
-					distributeViaDFS.add(userArtifact);
-				} else {
-					uploadToBlobServer.add(userArtifact);
-				}
-
-			} catch (IOException ex) {
-				distributeViaDFS.add(userArtifact);
-			}
-		}
-
-		uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer);
-
-		for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : distributeViaDFS) {
 			DistributedCache.writeFileInfoToConfig(
 				userArtifact.getKey(),
 				userArtifact.getValue(),
@@ -621,27 +575,4 @@ public class JobGraph implements Serializable {
 			);
 		}
 	}
-
-	private void uploadViaBlob(
-			InetSocketAddress blobServerAddress,
-			Configuration clientConfig,
-			Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer) throws IOException {
-		if (!uploadToBlobServer.isEmpty()) {
-			try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) {
-				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : uploadToBlobServer) {
-					final PermanentBlobKey key = blobClient.uploadFile(jobID,
-						new Path(userArtifact.getValue().filePath));
-
-					DistributedCache.writeFileInfoToConfig(
-						userArtifact.getKey(),
-						new DistributedCache.DistributedCacheEntry(
-							userArtifact.getValue().filePath,
-							userArtifact.getValue().isExecutable,
-							InstantiationUtil.serializeObject(key),
-							userArtifact.getValue().isZipped),
-						jobConfiguration);
-				}
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b89617b..4fab2b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,12 +25,11 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -90,7 +89,6 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -583,8 +581,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
-		uploadUserArtifacts(job);
-
 		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		try {
@@ -608,7 +604,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
-		uploadUserArtifacts(job);
 		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
 
 		final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
@@ -631,15 +626,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		}
 	}
 
-	private void uploadUserArtifacts(JobGraph job) throws JobExecutionException {
-		try {
-			final InetSocketAddress blobAddress = new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort());
-			job.uploadUserArtifacts(blobAddress, miniClusterConfiguration.getConfiguration());
-		} catch (IOException e) {
-			throw new JobExecutionException(job.getJobID(), "Could not upload user artifacts", e);
-		}
-	}
-
 	public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
 		final DispatcherGateway dispatcherGateway;
 		try {
@@ -653,7 +639,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		// from the ResourceManager
 		jobGraph.setAllowQueuedScheduling(true);
 
-		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGateway);
+
+		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
 
 		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
 			(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
@@ -685,32 +673,19 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		}
 	}
 
-	private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) {
-		List<Path> userJars = job.getUserJars();
-		if (!userJars.isEmpty()) {
-			CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
-			return jarUploadFuture.thenAccept(blobKeys -> {
-					for (PermanentBlobKey blobKey : blobKeys) {
-						job.addUserJarBlobKey(blobKey);
-					}
-				});
-		} else {
-			LOG.debug("No jars to upload for job {}.", job.getJobID());
-			return CompletableFuture.completedFuture(null);
-		}
+	private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
+		return blobServerAddressFuture.thenAccept(blobServerAddress -> {
+			try {
+				ClientUtils.uploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
+			} catch (FlinkException e) {
+				throw new CompletionException(e);
+			}
+		});
 	}
 
-	private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) {
+	private CompletableFuture<InetSocketAddress> createBlobServerAddress(final DispatcherGateway currentDispatcherGateway) {
 		return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
-			.thenApply(blobServerPort -> {
-				InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
-
-				try {
-					return BlobClient.uploadFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
-				} catch (IOException ioe) {
-					throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
-				}
-			});
+			.thenApply(blobServerPort -> new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort));
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
new file mode 100644
index 0000000..dc14cb1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ClientUtils}.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private static BlobServer blobServer = null;
+
+	@BeforeClass
+	public static void setup() throws IOException {
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+		blobServer = new BlobServer(config, new VoidBlobStore());
+		blobServer.start();
+	}
+
+	@AfterClass
+	public static void teardown() throws IOException {
+		if (blobServer != null) {
+			blobServer.close();
+		}
+	}
+
+	@Test
+	public void uploadAndSetUserJars() throws Exception {
+		java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+		JobGraph jobGraph = new JobGraph();
+
+		Collection<Path> jars = Arrays.asList(
+			new Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()),
+			new Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString()));
+
+		jars.forEach(jobGraph::addJar);
+
+		assertEquals(jars.size(), jobGraph.getUserJars().size());
+		assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+
+		ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+
+		assertEquals(jars.size(), jobGraph.getUserJars().size());
+		assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
+		assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count());
+
+		for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) {
+			blobServer.getFile(jobGraph.getJobID(), blobKey);
+		}
+	}
+
+	@Test
+	public void uploadAndSetUserArtifacts() throws Exception {
+		java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+		JobGraph jobGraph = new JobGraph();
+
+		Collection<DistributedCache.DistributedCacheEntry> localArtifacts = Arrays.asList(
+			new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art1")).toString(), true, true),
+			new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art2")).toString(), true, false),
+			new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art3")).toString(), false, true),
+			new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art4")).toString(), true, false)
+		);
+
+		Collection<DistributedCache.DistributedCacheEntry> distributedArtifacts = Arrays.asList(
+			new DistributedCache.DistributedCacheEntry("hdfs://localhost:1234/test", true, false)
+		);
+
+		for (DistributedCache.DistributedCacheEntry entry : localArtifacts) {
+			jobGraph.addUserArtifact(entry.filePath, entry);
+		}
+		for (DistributedCache.DistributedCacheEntry entry : distributedArtifacts) {
+			jobGraph.addUserArtifact(entry.filePath, entry);
+		}
+
+		final int totalNumArtifacts = localArtifacts.size() + distributedArtifacts.size();
+
+		assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
+		assertEquals(0, jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());
+
+		ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+
+		assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
+		assertEquals(localArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());
+		assertEquals(distributedArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey == null).count());
+		// 1 unique key for each local artifact, and null for distributed artifacts
+		assertEquals(localArtifacts.size() + 1, jobGraph.getUserArtifacts().values().stream().map(entry -> entry.blobKey).distinct().count());
+		for (DistributedCache.DistributedCacheEntry original : localArtifacts) {
+			assertState(original, jobGraph.getUserArtifacts().get(original.filePath), false, jobGraph.getJobID());
+		}
+		for (DistributedCache.DistributedCacheEntry original : distributedArtifacts) {
+			assertState(original, jobGraph.getUserArtifacts().get(original.filePath), true, jobGraph.getJobID());
+		}
+	}
+
+	private static void assertState(DistributedCache.DistributedCacheEntry original, DistributedCache.DistributedCacheEntry actual, boolean isBlobKeyNull, JobID jobId) throws Exception {
+		assertEquals(original.isZipped, actual.isZipped);
+		assertEquals(original.isExecutable, actual.isExecutable);
+		assertEquals(original.filePath, actual.filePath);
+		assertEquals(isBlobKeyNull, actual.blobKey == null);
+		if (!isBlobKeyNull) {
+			blobServer.getFile(
+				jobId,
+				InstantiationUtil.<PermanentBlobKey>deserializeObject(actual.blobKey, ClientUtilsTest.class.getClassLoader()));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 227f6e3..160402b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -18,20 +18,23 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import static org.junit.Assert.*;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
 public class JobGraphTest extends TestLogger {
 
 	@Test
@@ -263,22 +266,6 @@ public class JobGraphTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-
-	@Test
-	public void testConfiguringDistributedCache() throws Exception {
-		JobGraph testJob = new JobGraph("Test job");
-		testJob.addUserArtifact("dfsFile", new DistributedCache.DistributedCacheEntry("hdfs://tmp/file", false));
-
-		//it should never try to connect to that address
-		testJob.uploadUserArtifacts(new InetSocketAddress("localhost", 1111), new Configuration());
-
-		Configuration jobConfiguration = testJob.getJobConfiguration();
-		assertEquals(1, jobConfiguration.getInteger("DISTRIBUTED_CACHE_FILE_NUM", -1));
-		assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_DIR_1", true));
-		assertEquals("dfsFile", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_NAME_1", ""));
-		assertEquals("hdfs://tmp/file", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_PATH_1", ""));
-		assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_EXE_1", true));
-	}
 	
 	private static final void assertBefore(JobVertex v1, JobVertex v2, List<JobVertex> list) {
 		boolean seenFirst = false;
@@ -294,4 +281,32 @@ public class JobGraphTest extends TestLogger {
 			}
 		}
 	}
+
+	@Test
+	public void testSetUserArtifactBlobKey() throws IOException, ClassNotFoundException {
+		JobGraph jb = new JobGraph();
+
+		final DistributedCache.DistributedCacheEntry[] entries = {
+			new DistributedCache.DistributedCacheEntry("p1", true, true),
+			new DistributedCache.DistributedCacheEntry("p2", true, false),
+			new DistributedCache.DistributedCacheEntry("p3", false, true),
+			new DistributedCache.DistributedCacheEntry("p4", true, false),
+		};
+
+		for (DistributedCache.DistributedCacheEntry entry : entries) {
+			jb.addUserArtifact(entry.filePath, entry);
+		}
+
+		for (DistributedCache.DistributedCacheEntry entry : entries) {
+			PermanentBlobKey blobKey = new PermanentBlobKey();
+			jb.setUserArtifactBlobKey(entry.filePath, blobKey);
+
+			DistributedCache.DistributedCacheEntry jobGraphEntry = jb.getUserArtifacts().get(entry.filePath);
+			assertNotNull(jobGraphEntry);
+			assertEquals(blobKey, InstantiationUtil.deserializeObject(jobGraphEntry.blobKey, ClassLoader.getSystemClassLoader(), false));
+			assertEquals(entry.isExecutable, jobGraphEntry.isExecutable);
+			assertEquals(entry.isZipped, jobGraphEntry.isZipped);
+			assertEquals(entry.filePath, jobGraphEntry.filePath);
+		}
+	}
 }