You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/28 10:21:23 UTC
flink git commit: [FLINK-9624][runtime] Move jar/artifact upload out
of jobgraph
Repository: flink
Updated Branches:
refs/heads/master 81839d7e3 -> dd4c8469b
[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph
This closes #6199.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd4c8469
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd4c8469
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd4c8469
Branch: refs/heads/master
Commit: dd4c8469b11184b633d2b9514b9910622734270f
Parents: 81839d7
Author: zentol <ch...@apache.org>
Authored: Wed Jun 13 18:21:21 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 28 12:21:06 2018 +0200
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 16 +-
.../webmonitor/handlers/JarRunHandler.java | 14 +-
.../flink/runtime/client/ClientUtils.java | 127 +++++++++++++++
.../apache/flink/runtime/client/JobClient.java | 16 +-
.../client/JobSubmissionClientActor.java | 23 +--
.../apache/flink/runtime/jobgraph/JobGraph.java | 91 ++---------
.../flink/runtime/minicluster/MiniCluster.java | 53 ++-----
.../flink/runtime/client/ClientUtilsTest.java | 154 +++++++++++++++++++
.../flink/runtime/jobgraph/JobGraphTest.java | 59 ++++---
9 files changed, 362 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8eb4ec0..85699d7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -30,7 +30,7 @@ import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -323,17 +323,11 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
(BlobServerPortResponseBody response, String dispatcherAddress) -> {
final int blobServerPort = response.port;
final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
- final List<PermanentBlobKey> keys;
- try {
- log.info("Uploading jar files.");
- keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
- jobGraph.uploadUserArtifacts(address, flinkConfig);
- } catch (IOException ioe) {
- throw new CompletionException(new FlinkException("Could not upload job files.", ioe));
- }
- for (PermanentBlobKey key : keys) {
- jobGraph.addUserJarBlobKey(key);
+ try {
+ ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig));
+ } catch (Exception e) {
+ throw new CompletionException(e);
}
return jobGraph;
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 0605bf1..10387c8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -46,7 +46,6 @@ import akka.actor.AddressFromURIString;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -114,15 +113,10 @@ public class JarRunHandler extends
CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
- final List<PermanentBlobKey> keys;
try {
- keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
- } catch (IOException ioe) {
- throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
- }
-
- for (PermanentBlobKey key : keys) {
- jobGraph.addUserJarBlobKey(key);
+ ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration));
+ } catch (FlinkException e) {
+ throw new CompletionException(e);
}
return jobGraph;
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
new file mode 100644
index 0000000..fc6a621
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+ ;
+
+ /**
+ * Uploads all files required for the execution of the given {@link JobGraph} using the {@link BlobClient} from
+ * the given {@link Supplier}.
+ *
+ * @param jobGraph jobgraph requiring files
+ * @param clientSupplier supplier of blob client to upload files with
+ * @throws IOException if the upload fails
+ */
+ public static void uploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
+ List<Path> userJars = jobGraph.getUserJars();
+ Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts();
+ if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
+ try (BlobClient client = clientSupplier.get()) {
+ uploadAndSetUserJars(jobGraph, client);
+ uploadAndSetUserArtifacts(jobGraph, client);
+ } catch (IOException ioe) {
+ throw new FlinkException("Could not upload job files.", ioe);
+ }
+ }
+ }
+
+ /**
+ * Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient},
+ * and sets the appropriate blobkeys.
+ *
+ * @param jobGraph jobgraph requiring user jars
+ * @param blobClient client to upload jars with
+ * @throws IOException if the upload fails
+ */
+ private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException {
+ Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+ setUserJarBlobKeys(blobKeys, jobGraph);
+ }
+
+ private static Collection<PermanentBlobKey> uploadUserJars(JobID jobId, Collection<Path> userJars, BlobClient blobClient) throws IOException {
+ Collection<PermanentBlobKey> blobKeys = new ArrayList<>(userJars.size());
+ for (Path jar : userJars) {
+ final PermanentBlobKey blobKey = blobClient.uploadFile(jobId, jar);
+ blobKeys.add(blobKey);
+ }
+ return blobKeys;
+ }
+
+ private static void setUserJarBlobKeys(Collection<PermanentBlobKey> blobKeys, JobGraph jobGraph) {
+ blobKeys.forEach(jobGraph::addUserJarBlobKey);
+ }
+
+ /**
+ * Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient},
+ * and sets the appropriate blobkeys.
+ *
+ * @param jobGraph jobgraph requiring user artifacts
+ * @param blobClient client to upload artifacts with
+ * @throws IOException if the upload fails
+ */
+ private static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException {
+ Collection<Tuple2<String, Path>> artifactPaths = jobGraph.getUserArtifacts().entrySet().stream()
+ .map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath)))
+ .collect(Collectors.toList());
+
+ Collection<Tuple2<String, PermanentBlobKey>> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient);
+ setUserArtifactBlobKeys(jobGraph, blobKeys);
+ }
+
+ private static Collection<Tuple2<String, PermanentBlobKey>> uploadUserArtifacts(JobID jobID, Collection<Tuple2<String, Path>> userArtifacts, BlobClient blobClient) throws IOException {
+ Collection<Tuple2<String, PermanentBlobKey>> blobKeys = new ArrayList<>(userArtifacts.size());
+ for (Tuple2<String, Path> userArtifact : userArtifacts) {
+ // only upload local files
+ if (!userArtifact.f1.getFileSystem().isDistributedFS()) {
+ final PermanentBlobKey blobKey = blobClient.uploadFile(jobID, userArtifact.f1);
+ blobKeys.add(Tuple2.of(userArtifact.f0, blobKey));
+ }
+ }
+ return blobKeys;
+ }
+
+ private static void setUserArtifactBlobKeys(JobGraph jobGraph, Collection<Tuple2<String, PermanentBlobKey>> blobKeys) throws IOException {
+ for (Tuple2<String, PermanentBlobKey> blobKey : blobKeys) {
+ jobGraph.setUserArtifactBlobKey(blobKey.f0, blobKey.f1);
+ }
+ jobGraph.writeUserArtifactEntriesToConfiguration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 76d9bd6..27da3b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;
import akka.actor.ActorRef;
@@ -422,18 +424,10 @@ public class JobClient {
}
try {
- jobGraph.uploadUserJars(blobServerAddress, config);
- }
- catch (IOException e) {
- throw new JobSubmissionException(jobGraph.getJobID(),
- "Could not upload the program's JAR files to the JobManager.", e);
- }
-
- try {
- jobGraph.uploadUserArtifacts(blobServerAddress, config);
- } catch (IOException e) {
+ ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config));
+ } catch (FlinkException e) {
throw new JobSubmissionException(jobGraph.getJobID(),
- "Could not upload custom user artifacts to the job manager.", e);
+ "Could not upload job files.", e);
}
CompletableFuture<Acknowledge> submissionFuture = jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 9b95633..2783b09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -31,13 +32,13 @@ import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -154,23 +155,9 @@ public class JobSubmissionClientActor extends JobClientActor {
final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync(
(InetSocketAddress blobServerAddress) -> {
try {
- jobGraph.uploadUserJars(blobServerAddress, clientConfig);
- } catch (IOException e) {
- throw new CompletionException(
- new JobSubmissionException(
- jobGraph.getJobID(),
- "Could not upload the jar files to the job manager.",
- e));
- }
-
- try {
- jobGraph.uploadUserArtifacts(blobServerAddress, clientConfig);
- } catch (IOException e) {
- throw new CompletionException(
- new JobSubmissionException(
- jobGraph.getJobID(),
- "Could not upload custom user artifacts to the job manager.",
- e));
+ ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig));
+ } catch (FlinkException e) {
+ throw new CompletionException(e);
}
},
getContext().dispatcher());
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 7231383..b3e03de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.util.InstantiationUtil;
@@ -32,12 +31,10 @@ import org.apache.flink.util.SerializedValue;
import java.io.IOException;
import java.io.Serializable;
-import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -552,68 +549,25 @@ public class JobGraph implements Serializable {
return this.userJarBlobKeys;
}
- /**
- * Uploads the previously added user JAR files to the job manager through
- * the job manager's BLOB server. The BLOB servers' address is given as a
- * parameter. This function issues a blocking call.
- *
- * @param blobServerAddress of the blob server to upload the jars to
- * @param blobClientConfig the blob client configuration
- * @throws IOException Thrown, if the file upload to the JobManager failed.
- */
- public void uploadUserJars(
- InetSocketAddress blobServerAddress,
- Configuration blobClientConfig) throws IOException {
- if (!userJars.isEmpty()) {
- List<PermanentBlobKey> blobKeys = BlobClient.uploadFiles(
- blobServerAddress, blobClientConfig, jobID, userJars);
-
- for (PermanentBlobKey blobKey : blobKeys) {
- if (!userJarBlobKeys.contains(blobKey)) {
- userJarBlobKeys.add(blobKey);
- }
- }
- }
- }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
- /**
- * Configures JobGraph with user specified artifacts. If the files are in local system it uploads them
- * to the BLOB server, otherwise it just puts metadata for future remote access from within task executor.
- *
- * @param blobServerAddress of the blob server to upload the files to
- * @param blobClientConfig the blob client configuration
- * @throws IOException Thrown, if the file upload to the Blob server failed.
- */
- public void uploadUserArtifacts(
- InetSocketAddress blobServerAddress,
- Configuration blobClientConfig) throws IOException {
+ public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) throws IOException {
+ byte[] serializedBlobKey;
+ serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
- Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer = new HashSet<>();
- Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> distributeViaDFS = new HashSet<>();
+ userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry(
+ originalEntry.filePath,
+ originalEntry.isExecutable,
+ serializedBlobKey,
+ originalEntry.isZipped
+ ));
+ }
+ public void writeUserArtifactEntriesToConfiguration() {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
- Path filePath = new Path(userArtifact.getValue().filePath);
-
- try {
- if (filePath.getFileSystem().isDistributedFS()) {
- distributeViaDFS.add(userArtifact);
- } else {
- uploadToBlobServer.add(userArtifact);
- }
-
- } catch (IOException ex) {
- distributeViaDFS.add(userArtifact);
- }
- }
-
- uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer);
-
- for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : distributeViaDFS) {
DistributedCache.writeFileInfoToConfig(
userArtifact.getKey(),
userArtifact.getValue(),
@@ -621,27 +575,4 @@ public class JobGraph implements Serializable {
);
}
}
-
- private void uploadViaBlob(
- InetSocketAddress blobServerAddress,
- Configuration clientConfig,
- Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer) throws IOException {
- if (!uploadToBlobServer.isEmpty()) {
- try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) {
- for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : uploadToBlobServer) {
- final PermanentBlobKey key = blobClient.uploadFile(jobID,
- new Path(userArtifact.getValue().filePath));
-
- DistributedCache.writeFileInfoToConfig(
- userArtifact.getKey(),
- new DistributedCache.DistributedCacheEntry(
- userArtifact.getValue().filePath,
- userArtifact.getValue().isExecutable,
- InstantiationUtil.serializeObject(key),
- userArtifact.getValue().isZipped),
- jobConfiguration);
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b89617b..4fab2b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,12 +25,11 @@ import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -90,7 +89,6 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -583,8 +581,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
- uploadUserArtifacts(job);
-
final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
try {
@@ -608,7 +604,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
- uploadUserArtifacts(job);
final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
@@ -631,15 +626,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
- private void uploadUserArtifacts(JobGraph job) throws JobExecutionException {
- try {
- final InetSocketAddress blobAddress = new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort());
- job.uploadUserArtifacts(blobAddress, miniClusterConfiguration.getConfiguration());
- } catch (IOException e) {
- throw new JobExecutionException(job.getJobID(), "Could not upload user artifacts", e);
- }
- }
-
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final DispatcherGateway dispatcherGateway;
try {
@@ -653,7 +639,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
- final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+ final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGateway);
+
+ final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
@@ -685,32 +673,19 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
- private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) {
- List<Path> userJars = job.getUserJars();
- if (!userJars.isEmpty()) {
- CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
- return jarUploadFuture.thenAccept(blobKeys -> {
- for (PermanentBlobKey blobKey : blobKeys) {
- job.addUserJarBlobKey(blobKey);
- }
- });
- } else {
- LOG.debug("No jars to upload for job {}.", job.getJobID());
- return CompletableFuture.completedFuture(null);
- }
+ private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
+ return blobServerAddressFuture.thenAccept(blobServerAddress -> {
+ try {
+ ClientUtils.uploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
+ } catch (FlinkException e) {
+ throw new CompletionException(e);
+ }
+ });
}
- private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) {
+ private CompletableFuture<InetSocketAddress> createBlobServerAddress(final DispatcherGateway currentDispatcherGateway) {
return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
- .thenApply(blobServerPort -> {
- InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
-
- try {
- return BlobClient.uploadFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
- } catch (IOException ioe) {
- throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
- }
- });
+ .thenApply(blobServerPort -> new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort));
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
new file mode 100644
index 0000000..dc14cb1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ClientUtils}.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static BlobServer blobServer = null;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
+ blobServer = new BlobServer(config, new VoidBlobStore());
+ blobServer.start();
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ if (blobServer != null) {
+ blobServer.close();
+ }
+ }
+
+ @Test
+ public void uploadAndSetUserJars() throws Exception {
+ java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+ JobGraph jobGraph = new JobGraph();
+
+ Collection<Path> jars = Arrays.asList(
+ new Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()),
+ new Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString()));
+
+ jars.forEach(jobGraph::addJar);
+
+ assertEquals(jars.size(), jobGraph.getUserJars().size());
+ assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+
+ ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+
+ assertEquals(jars.size(), jobGraph.getUserJars().size());
+ assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
+ assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count());
+
+ for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) {
+ blobServer.getFile(jobGraph.getJobID(), blobKey);
+ }
+ }
+
+ @Test
+ public void uploadAndSetUserArtifacts() throws Exception {
+ java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+ JobGraph jobGraph = new JobGraph();
+
+ Collection<DistributedCache.DistributedCacheEntry> localArtifacts = Arrays.asList(
+ new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art1")).toString(), true, true),
+ new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art2")).toString(), true, false),
+ new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art3")).toString(), false, true),
+ new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art4")).toString(), true, false)
+ );
+
+ Collection<DistributedCache.DistributedCacheEntry> distributedArtifacts = Arrays.asList(
+ new DistributedCache.DistributedCacheEntry("hdfs://localhost:1234/test", true, false)
+ );
+
+ for (DistributedCache.DistributedCacheEntry entry : localArtifacts) {
+ jobGraph.addUserArtifact(entry.filePath, entry);
+ }
+ for (DistributedCache.DistributedCacheEntry entry : distributedArtifacts) {
+ jobGraph.addUserArtifact(entry.filePath, entry);
+ }
+
+ final int totalNumArtifacts = localArtifacts.size() + distributedArtifacts.size();
+
+ assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
+ assertEquals(0, jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());
+
+ ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+
+ assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
+ assertEquals(localArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());
+ assertEquals(distributedArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey == null).count());
+ // 1 unique key for each local artifact, and null for distributed artifacts
+ assertEquals(localArtifacts.size() + 1, jobGraph.getUserArtifacts().values().stream().map(entry -> entry.blobKey).distinct().count());
+ for (DistributedCache.DistributedCacheEntry original : localArtifacts) {
+ assertState(original, jobGraph.getUserArtifacts().get(original.filePath), false, jobGraph.getJobID());
+ }
+ for (DistributedCache.DistributedCacheEntry original : distributedArtifacts) {
+ assertState(original, jobGraph.getUserArtifacts().get(original.filePath), true, jobGraph.getJobID());
+ }
+ }
+
+ private static void assertState(DistributedCache.DistributedCacheEntry original, DistributedCache.DistributedCacheEntry actual, boolean isBlobKeyNull, JobID jobId) throws Exception {
+ assertEquals(original.isZipped, actual.isZipped);
+ assertEquals(original.isExecutable, actual.isExecutable);
+ assertEquals(original.filePath, actual.filePath);
+ assertEquals(isBlobKeyNull, actual.blobKey == null);
+ if (!isBlobKeyNull) {
+ blobServer.getFile(
+ jobId,
+ InstantiationUtil.<PermanentBlobKey>deserializeObject(actual.blobKey, ClientUtilsTest.class.getClassLoader()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 227f6e3..160402b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -18,20 +18,23 @@
package org.apache.flink.runtime.jobgraph;
-import static org.junit.Assert.*;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
public class JobGraphTest extends TestLogger {
@Test
@@ -263,22 +266,6 @@ public class JobGraphTest extends TestLogger {
fail(e.getMessage());
}
}
-
- @Test
- public void testConfiguringDistributedCache() throws Exception {
- JobGraph testJob = new JobGraph("Test job");
- testJob.addUserArtifact("dfsFile", new DistributedCache.DistributedCacheEntry("hdfs://tmp/file", false));
-
- //it should never try to connect to that address
- testJob.uploadUserArtifacts(new InetSocketAddress("localhost", 1111), new Configuration());
-
- Configuration jobConfiguration = testJob.getJobConfiguration();
- assertEquals(1, jobConfiguration.getInteger("DISTRIBUTED_CACHE_FILE_NUM", -1));
- assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_DIR_1", true));
- assertEquals("dfsFile", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_NAME_1", ""));
- assertEquals("hdfs://tmp/file", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_PATH_1", ""));
- assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_EXE_1", true));
- }
private static final void assertBefore(JobVertex v1, JobVertex v2, List<JobVertex> list) {
boolean seenFirst = false;
@@ -294,4 +281,32 @@ public class JobGraphTest extends TestLogger {
}
}
}
+
+ @Test
+ public void testSetUserArtifactBlobKey() throws IOException, ClassNotFoundException {
+ JobGraph jb = new JobGraph();
+
+ final DistributedCache.DistributedCacheEntry[] entries = {
+ new DistributedCache.DistributedCacheEntry("p1", true, true),
+ new DistributedCache.DistributedCacheEntry("p2", true, false),
+ new DistributedCache.DistributedCacheEntry("p3", false, true),
+ new DistributedCache.DistributedCacheEntry("p4", true, false),
+ };
+
+ for (DistributedCache.DistributedCacheEntry entry : entries) {
+ jb.addUserArtifact(entry.filePath, entry);
+ }
+
+ for (DistributedCache.DistributedCacheEntry entry : entries) {
+ PermanentBlobKey blobKey = new PermanentBlobKey();
+ jb.setUserArtifactBlobKey(entry.filePath, blobKey);
+
+ DistributedCache.DistributedCacheEntry jobGraphEntry = jb.getUserArtifacts().get(entry.filePath);
+ assertNotNull(jobGraphEntry);
+ assertEquals(blobKey, InstantiationUtil.deserializeObject(jobGraphEntry.blobKey, ClassLoader.getSystemClassLoader(), false));
+ assertEquals(entry.isExecutable, jobGraphEntry.isExecutable);
+ assertEquals(entry.isZipped, jobGraphEntry.isZipped);
+ assertEquals(entry.filePath, jobGraphEntry.filePath);
+ }
+ }
}