You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/29 14:43:13 UTC
flink git commit: [FLINK-7989][yarn] Do not upload the flink-dist jar
twice
Repository: flink
Updated Branches:
refs/heads/master 4daf9223a -> 520a74f4b
[FLINK-7989][yarn] Do not upload the flink-dist jar twice
We always add the dist.jar ourselves, but it could also be inside a shipped
folder such as the "lib/" folder and was then distributed multiple times.
This closes #4951.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/520a74f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/520a74f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/520a74f4
Branch: refs/heads/master
Commit: 520a74f4b683e49ec233f11a0d5af6cc96e9f1cd
Parents: 4daf922
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Nov 3 21:19:34 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Nov 29 15:43:04 2017 +0100
----------------------------------------------------------------------
.../yarn/AbstractYarnClusterDescriptor.java | 106 ++++++++++++-------
1 file changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/520a74f4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 468c0c8..a910148 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -182,14 +182,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
this.flinkJarPath = localJarPath;
}
+ /**
+ * Adds the given files to the list of files to ship.
+ *
+ * <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
+ * {@link #uploadAndRegisterFiles(Collection, FileSystem, Path, ApplicationId, List, Map, StringBuilder)}
+ * since we upload the Flink uber jar ourselves and do not need to deploy it multiple times.
+ *
+ * @param shipFiles files to ship
+ */
public void addShipFiles(List<File> shipFiles) {
- for (File shipFile: shipFiles) {
- // remove uberjar from ship list (by default everything in the lib/ folder is added to
- // the list of files to ship, but we handle the uberjar separately.
- if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
- this.shipFiles.add(shipFile);
- }
- }
+ this.shipFiles.addAll(shipFiles);
}
public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
@@ -1048,6 +1051,27 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return resource.f0;
}
+ /**
+ * Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
+ * for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately.
+ *
+ * @param shipFiles
+ * files to upload
+ * @param fs
+ * file system to upload to
+ * @param targetHomeDir
+ * remote home directory to upload to
+ * @param appId
+ * application ID
+ * @param remotePaths
+ * paths of the remote resources (uploaded resources will be added)
+ * @param localResources
+ * map of resources (uploaded resources will be added)
+ * @param envShipFileList
+ * list of shipped files in a format understood by {@link Utils#createTaskExecutorContext}
+ *
+ * @return list of class paths with the the proper resource keys from the registration
+ */
static List<String> uploadAndRegisterFiles(
Collection<File> shipFiles,
FileSystem fs,
@@ -1068,40 +1092,48 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
@Override
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
throws IOException {
- java.nio.file.Path relativePath = parentPath.relativize(file);
-
- String key = relativePath.toString();
- try {
- Path remotePath = setupSingleLocalResource(
- key,
- fs,
- appId,
- new Path(file.toUri()),
- localResources,
- targetHomeDir,
- relativePath.getParent().toString());
- remotePaths.add(remotePath);
- envShipFileList.append(key).append("=").append(remotePath).append(",");
-
- // add files to the classpath
- classPaths.add(key);
-
- return FileVisitResult.CONTINUE;
- } catch (URISyntaxException e) {
- throw new IOException(e);
+
+ if (!(file.getFileName().startsWith("flink-dist") &&
+ file.getFileName().endsWith("jar"))) {
+
+ java.nio.file.Path relativePath = parentPath.relativize(file);
+
+ String key = relativePath.toString();
+ try {
+ Path remotePath = setupSingleLocalResource(
+ key,
+ fs,
+ appId,
+ new Path(file.toUri()),
+ localResources,
+ targetHomeDir,
+ relativePath.getParent().toString());
+ remotePaths.add(remotePath);
+ envShipFileList.append(key).append("=")
+ .append(remotePath).append(",");
+
+ // add files to the classpath
+ classPaths.add(key);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
}
+
+ return FileVisitResult.CONTINUE;
}
});
} else {
- Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
- String key = shipFile.getName();
- Path remotePath = setupSingleLocalResource(
- key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
- remotePaths.add(remotePath);
- envShipFileList.append(key).append("=").append(remotePath).append(",");
-
- // add files to the classpath
- classPaths.add(key);
+ if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
+ Path shipLocalPath = new Path(shipFile.toURI());
+ String key = shipFile.getName();
+ Path remotePath = setupSingleLocalResource(
+ key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
+ remotePaths.add(remotePath);
+ envShipFileList.append(key).append("=").append(remotePath).append(",");
+
+ // add files to the classpath
+ classPaths.add(key);
+ }
}
}