You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/29 14:43:13 UTC

flink git commit: [FLINK-7989][yarn] Do not upload the flink-dist jar twice

Repository: flink
Updated Branches:
  refs/heads/master 4daf9223a -> 520a74f4b


[FLINK-7989][yarn] Do not upload the flink-dist jar twice

We always add the dist.jar ourselves, but it could also be inside a shipped
folder such as the "lib/" folder and was then distributed multiple times.

This closes #4951.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/520a74f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/520a74f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/520a74f4

Branch: refs/heads/master
Commit: 520a74f4b683e49ec233f11a0d5af6cc96e9f1cd
Parents: 4daf922
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Nov 3 21:19:34 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Nov 29 15:43:04 2017 +0100

----------------------------------------------------------------------
 .../yarn/AbstractYarnClusterDescriptor.java     | 106 ++++++++++++-------
 1 file changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/520a74f4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 468c0c8..a910148 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -182,14 +182,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		this.flinkJarPath = localJarPath;
 	}
 
+	/**
+	 * Adds the given files to the list of files to ship.
+	 *
+	 * <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
+	 * {@link #uploadAndRegisterFiles(Collection, FileSystem, Path, ApplicationId, List, Map, StringBuilder)}
+	 * since we upload the Flink uber jar ourselves and do not need to deploy it multiple times.
+	 *
+	 * @param shipFiles files to ship
+	 */
 	public void addShipFiles(List<File> shipFiles) {
-		for (File shipFile: shipFiles) {
-			// remove uberjar from ship list (by default everything in the lib/ folder is added to
-			// the list of files to ship, but we handle the uberjar separately.
-			if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
-				this.shipFiles.add(shipFile);
-			}
-		}
+		this.shipFiles.addAll(shipFiles);
 	}
 
 	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
@@ -1048,6 +1051,27 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return resource.f0;
 	}
 
+	/**
+	 * Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
+	 * for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately.
+	 *
+	 * @param shipFiles
+	 * 		files to upload
+	 * @param fs
+	 * 		file system to upload to
+	 * @param targetHomeDir
+	 * 		remote home directory to upload to
+	 * @param appId
+	 * 		application ID
+	 * @param remotePaths
+	 * 		paths of the remote resources (uploaded resources will be added)
+	 * @param localResources
+	 * 		map of resources (uploaded resources will be added)
+	 * @param envShipFileList
+	 * 		list of shipped files in a format understood by {@link Utils#createTaskExecutorContext}
+	 *
+	 * @return list of class paths with the the proper resource keys from the registration
+	 */
 	static List<String> uploadAndRegisterFiles(
 			Collection<File> shipFiles,
 			FileSystem fs,
@@ -1068,40 +1092,48 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					@Override
 					public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
 						throws IOException {
-						java.nio.file.Path relativePath = parentPath.relativize(file);
-
-						String key = relativePath.toString();
-						try {
-							Path remotePath = setupSingleLocalResource(
-								key,
-								fs,
-								appId,
-								new Path(file.toUri()),
-								localResources,
-								targetHomeDir,
-								relativePath.getParent().toString());
-							remotePaths.add(remotePath);
-							envShipFileList.append(key).append("=").append(remotePath).append(",");
-
-							// add files to the classpath
-							classPaths.add(key);
-
-							return FileVisitResult.CONTINUE;
-						} catch (URISyntaxException e) {
-							throw new IOException(e);
+
+						if (!(file.getFileName().startsWith("flink-dist") &&
+								file.getFileName().endsWith("jar"))) {
+
+							java.nio.file.Path relativePath = parentPath.relativize(file);
+
+							String key = relativePath.toString();
+							try {
+								Path remotePath = setupSingleLocalResource(
+									key,
+									fs,
+									appId,
+									new Path(file.toUri()),
+									localResources,
+									targetHomeDir,
+									relativePath.getParent().toString());
+								remotePaths.add(remotePath);
+								envShipFileList.append(key).append("=")
+									.append(remotePath).append(",");
+
+								// add files to the classpath
+								classPaths.add(key);
+							} catch (URISyntaxException e) {
+								throw new IOException(e);
+							}
 						}
+
+						return FileVisitResult.CONTINUE;
 					}
 				});
 			} else {
-				Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
-				String key = shipFile.getName();
-				Path remotePath = setupSingleLocalResource(
-					key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
-				remotePaths.add(remotePath);
-				envShipFileList.append(key).append("=").append(remotePath).append(",");
-
-				// add files to the classpath
-				classPaths.add(key);
+				if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
+					Path shipLocalPath = new Path(shipFile.toURI());
+					String key = shipFile.getName();
+					Path remotePath = setupSingleLocalResource(
+						key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
+					remotePaths.add(remotePath);
+					envShipFileList.append(key).append("=").append(remotePath).append(",");
+
+					// add files to the classpath
+					classPaths.add(key);
+				}
 			}
 
 		}