You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/03 12:58:13 UTC

[GitHub] [flink] kl0u commented on a change in pull request #12791: [FLINK-18362][FLINK-13838][yarn] Add yarn.ship-archives to support LocalResourceType.ARCHIVE

kl0u commented on a change in pull request #12791:
URL: https://github.com/apache/flink/pull/12791#discussion_r449564490



##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
##########
@@ -38,27 +39,41 @@
 class YarnLocalResourceDescriptor {
 
 	private static final String STRING_FORMAT = "YarnLocalResourceDescriptor{" +
-		"key=%s, path=%s, size=%d, modificationTime=%d, visibility=%s}";
+		"key=%s, path=%s, size=%d, modificationTime=%d, visibility=%s, type=%s}";
 	private static final Pattern LOCAL_RESOURCE_DESC_FORMAT = Pattern.compile("YarnLocalResourceDescriptor\\{" +
-		"key=(\\S+), path=(\\S+), size=([\\d]+), modificationTime=([\\d]+), visibility=(\\S+)}");
+		"key=(\\S+), path=(\\S+), size=([\\d]+), modificationTime=([\\d]+), visibility=(\\S+), type=(\\S+)}");
 
 	private final String resourceKey;
 	private final Path path;
 	private final long size;
 	private final long modificationTime;
 	private final LocalResourceVisibility visibility;
+	private final LocalResourceType resourceType;
 
 	YarnLocalResourceDescriptor(
 			String resourceKey,
 			Path path,
 			long resourceSize,
 			long modificationTime,
-			LocalResourceVisibility visibility) {
+			LocalResourceVisibility visibility,
+			LocalResourceType resourceType
+			) {

Review comment:
       the `) {` can go to the previous line with the last argument. 

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
##########
@@ -112,12 +134,14 @@ static YarnLocalResourceDescriptor fromFileStatus(
 				fileStatus.getPath(),
 				fileStatus.getLen(),
 				fileStatus.getModificationTime(),
-				visibility);
+				visibility,
+				resourceType
+			);

Review comment:
       same as above

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -139,6 +140,21 @@ public void close() {
 		IOUtils.closeQuietly(fileSystem);
 	}
 

Review comment:
       Please add javadoc. You can copy from the method below.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -155,7 +171,8 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
 			final Path resourcePath,
 			final String relativeDstPath,
 			final boolean whetherToAddToRemotePaths,
-			final boolean whetherToAddToEnvShipResourceList) throws IOException {
+			final boolean whetherToAddToEnvShipResourceList,
+			final LocalResourceType resourceType) throws IOException {

Review comment:
       Update the javadoc to include the new parameter.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -187,6 +191,13 @@ public YarnClusterDescriptor(
 		return files.isEmpty() ? Optional.empty() : Optional.of(files);
 	}
 
+	private Optional<List<File>> decodeArchivesToShipToCluster(final Configuration configuration) {

Review comment:
       The `decodeArchivesToShipToCluster` and `decodeDirsToShipToCluster` can be merged into a single method that takes as argument the name of the config option, right?

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -206,6 +225,13 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
 		return Tuple2.of(dst, fss[0].getModificationTime());
 	}
 
+	List<String> registerMultipleLocalResources(

Review comment:
       Javadoc.

##########
File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
##########
@@ -55,6 +58,8 @@ public void testFromString() throws Exception {
 		assertThat(newLocalResourceDesc.getSize(), is(size));
 		assertThat(newLocalResourceDesc.getModificationTime(), is(ts));
 		assertThat(newLocalResourceDesc.getVisibility(), is(LocalResourceVisibility.PUBLIC));
+		assertThat(newLocalResourceDesc.getResourceType(), is(LocalResourceType.FILE));
+

Review comment:
       unnecessary empty line. Please remove it.

##########
File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
##########
@@ -46,7 +47,9 @@ public void testFromString() throws Exception {
 			path,
 			size,
 			ts,
-			LocalResourceVisibility.PUBLIC);
+			LocalResourceVisibility.PUBLIC,
+			LocalResourceType.FILE
+		);

Review comment:
       Same as above

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -220,7 +246,9 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
 	 */
 	List<String> registerMultipleLocalResources(
 			final Collection<Path> shipFiles,
-			final String localResourcesDirectory) throws IOException {
+			final String localResourcesDirectory,

Review comment:
       Update javadoc.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
##########
@@ -94,7 +113,9 @@ static YarnLocalResourceDescriptor fromString(String desc) throws Exception {
 				new Path(m.group(2)),
 				Long.parseLong(m.group(3)),
 				Long.parseLong(m.group(4)),
-				LocalResourceVisibility.valueOf(m.group(5)));
+				LocalResourceVisibility.valueOf(m.group(5)),
+				LocalResourceType.valueOf(m.group(6))
+			);

Review comment:
       `);` to previous line.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -776,6 +810,12 @@ private ApplicationReport startAppMaster(
 					Path.CUR_DIR);
 		}
 
+		if (!shipArchives.isEmpty()) {
+			fileUploader.registerMultipleLocalResources(

Review comment:
       Why not putting one argument per line? Normally we go for either all arguments on one line, or one per line. :)

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
##########
@@ -38,27 +39,41 @@
 class YarnLocalResourceDescriptor {
 
 	private static final String STRING_FORMAT = "YarnLocalResourceDescriptor{" +
-		"key=%s, path=%s, size=%d, modificationTime=%d, visibility=%s}";
+		"key=%s, path=%s, size=%d, modificationTime=%d, visibility=%s, type=%s}";
 	private static final Pattern LOCAL_RESOURCE_DESC_FORMAT = Pattern.compile("YarnLocalResourceDescriptor\\{" +
-		"key=(\\S+), path=(\\S+), size=([\\d]+), modificationTime=([\\d]+), visibility=(\\S+)}");
+		"key=(\\S+), path=(\\S+), size=([\\d]+), modificationTime=([\\d]+), visibility=(\\S+), type=(\\S+)}");
 
 	private final String resourceKey;
 	private final Path path;
 	private final long size;
 	private final long modificationTime;
 	private final LocalResourceVisibility visibility;
+	private final LocalResourceType resourceType;
 
 	YarnLocalResourceDescriptor(
 			String resourceKey,
 			Path path,
 			long resourceSize,
 			long modificationTime,
-			LocalResourceVisibility visibility) {
+			LocalResourceVisibility visibility,
+			LocalResourceType resourceType
+			) {
 		this.resourceKey = checkNotNull(resourceKey);
 		this.path = checkNotNull(path);
 		this.size = resourceSize;
 		this.modificationTime = modificationTime;
 		this.visibility = checkNotNull(visibility);
+		this.resourceType = resourceType;
+	}
+
+	YarnLocalResourceDescriptor(
+		String resourceKey,

Review comment:
       Add one more tab of indentation so that the separation is clear between the args and the body and the `) {` can go to the same line as the last argument.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -220,7 +246,9 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
 	 */
 	List<String> registerMultipleLocalResources(
 			final Collection<Path> shipFiles,
-			final String localResourcesDirectory) throws IOException {
+			final String localResourcesDirectory,
+			LocalResourceType resourceType

Review comment:
       Add `final` to the parameter (as done above) and also the closing `) throws IOException {` can be on the same line as the last parameter. 

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -267,6 +278,29 @@ public void addShipFiles(List<File> shipFiles) {
 		this.shipFiles.addAll(shipFiles);
 	}
 
+	/**
+	 * Adds the given files to the list of archives to ship.
+	 **
+	 * @param shipArchives archives to ship
+	 */
+	public void addShipArchives(List<File> shipArchives) {
+		checkArgument(isArchiveOnlyIncludedInShipArchiveFiles(shipArchives), "Non-archive files are included.");
+		this.shipArchives.addAll(shipArchives);
+	}
+
+	private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List<File> shipFiles) {
+		return shipFiles.stream()
+			.filter(File::isFile)
+			.map(File::getName)
+			.map(String::toLowerCase)
+			.allMatch(name -> name.endsWith(".tar.gz") ||
+				name.endsWith(".tar") ||
+				name.endsWith(".tgz") ||
+				name.endsWith(".dst") ||
+				name.endsWith(".jar") ||

Review comment:
       Do we want to allow jars here ? This means that they will be un-jared at the other end.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org