You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/15 17:57:10 UTC

[flink] branch master updated: [FLINK-13127][yarn] Add support for resource classloading for resources shipped using --yarnship.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 17e2747  [FLINK-13127][yarn] Add support for resource classloading for resources shipped using --yarnship.
17e2747 is described below

commit 17e2747e575cc4d9847be76b5ce9a75a7b6707f7
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Fri Jul 26 17:04:57 2019 +0200

    [FLINK-13127][yarn] Add support for resource classloading for resources shipped using --yarnship.
    
    This closes #9022.
---
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 99 +++++++++++-----------
 .../flink/yarn/YarnClusterDescriptorTest.java      |  6 +-
 .../org/apache/flink/yarn/YarnFileStageTest.java   | 13 ++-
 3 files changed, 66 insertions(+), 52 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 5b2bc2a..23d1d3a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -83,7 +83,6 @@ import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.URISyntaxException;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.SimpleFileVisitor;
@@ -1086,8 +1085,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			Path localSrcPath,
 			Map<String, LocalResource> localResources,
 			Path targetHomeDir,
-			String relativeTargetPath) throws IOException, URISyntaxException {
-
+			String relativeTargetPath) throws IOException {
 		Tuple2<Path, LocalResource> resource = Utils.setupLocalResource(
 			fs,
 			appId.toString(),
@@ -1101,6 +1099,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	/**
+	 * Match file name for "<tt>flink-dist*.jar</tt>" pattern.
+	 *
+	 * @param fileName file name to check
+	 * @return true if file is a dist jar
+	 */
+	private static boolean isDistJar(String fileName) {
+		return fileName.startsWith("flink-dist") && fileName.endsWith("jar");
+	}
+
+	/**
 	 * Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
 	 * for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately.
 	 *
@@ -1128,64 +1136,59 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			ApplicationId appId,
 			List<Path> remotePaths,
 			Map<String, LocalResource> localResources,
-			StringBuilder envShipFileList) throws IOException, URISyntaxException {
-
-		final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
+			StringBuilder envShipFileList) throws IOException {
+		final List<Path> localPaths = new ArrayList<>();
+		final List<Path> relativePaths = new ArrayList<>();
 		for (File shipFile : shipFiles) {
 			if (shipFile.isDirectory()) {
 				// add directories to the classpath
-				java.nio.file.Path shipPath = shipFile.toPath();
+				final java.nio.file.Path shipPath = shipFile.toPath();
 				final java.nio.file.Path parentPath = shipPath.getParent();
-
 				Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
 					@Override
-					public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
-						throws IOException {
-						String fileName = file.getFileName().toString();
-						if (!(fileName.startsWith("flink-dist") &&
-								fileName.endsWith("jar"))) {
-
-							java.nio.file.Path relativePath = parentPath.relativize(file);
-
-							String key = relativePath.toString();
-							try {
-								Path remotePath = setupSingleLocalResource(
-									key,
-									fs,
-									appId,
-									new Path(file.toUri()),
-									localResources,
-									targetHomeDir,
-									relativePath.getParent().toString());
-								remotePaths.add(remotePath);
-								envShipFileList.append(key).append("=")
-									.append(remotePath).append(",");
-
-								// add files to the classpath
-								classPaths.add(key);
-							} catch (URISyntaxException e) {
-								throw new IOException(e);
-							}
-						}
-
+					public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) {
+						localPaths.add(new Path(file.toUri()));
+						relativePaths.add(new Path(parentPath.relativize(file).toString()));
 						return FileVisitResult.CONTINUE;
 					}
 				});
 			} else {
-				if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
-					Path shipLocalPath = new Path(shipFile.toURI());
-					String key = shipFile.getName();
-					Path remotePath = setupSingleLocalResource(
-						key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
-					remotePaths.add(remotePath);
-					envShipFileList.append(key).append("=").append(remotePath).append(",");
-
-					// add files to the classpath
-					classPaths.add(key);
-				}
+				localPaths.add(new Path(shipFile.toURI()));
+				relativePaths.add(new Path(shipFile.getName()));
 			}
+		}
 
+		final Set<String> archives = new HashSet<>();
+		final Set<String> resources = new HashSet<>();
+		for (int i = 0; i < localPaths.size(); i++) {
+			final Path localPath = localPaths.get(i);
+			final Path relativePath = relativePaths.get(i);
+			if (!isDistJar(relativePath.getName())) {
+				final String key = relativePath.toString();
+				final Path remotePath = setupSingleLocalResource(
+					key,
+					fs,
+					appId,
+					localPath,
+					localResources,
+					targetHomeDir,
+					relativePath.getParent().toString());
+				remotePaths.add(remotePath);
+				envShipFileList.append(key).append("=").append(remotePath).append(",");
+				// add files to the classpath
+				if (key.endsWith("jar")) {
+					archives.add(relativePath.toString());
+				} else {
+					resources.add(relativePath.getParent().toString());
+				}
+			}
 		}
+
+		// construct classpath, we always want resource directories to go first, we also sort
+		// both resources and archives in order to make classpath deterministic
+		final ArrayList<String> classPaths = new ArrayList<>();
+		resources.stream().sorted().forEach(classPaths::add);
+		archives.stream().sorted().forEach(classPaths::add);
 		return classPaths;
 	}
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 842ebb6..4940f12 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -459,11 +459,11 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_PLUGINS_DIR);
 	}
 
-	public void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception {
+	private void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception {
 		try (YarnClusterDescriptor descriptor = createYarnClusterDescriptor()) {
 			File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
 			File libFile = new File(libFolder, "libFile.jar");
-			libFile.createNewFile();
+			assertTrue(libFile.createNewFile());
 
 			Set<File> effectiveShipFiles = new HashSet<>();
 
@@ -487,7 +487,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	}
 
 	@Test
-	public void testEnvironmentEmptyPluginsShipping() throws Exception {
+	public void testEnvironmentEmptyPluginsShipping() {
 		try (YarnClusterDescriptor descriptor = createYarnClusterDescriptor()) {
 			File pluginsFolder = Paths.get(temporaryFolder.getRoot().getAbsolutePath(), "s0m3_p4th_th4t_sh0uld_n0t_3x1sts").toFile();
 			Set<File> effectiveShipFiles = new HashSet<>();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
index 527782c..d0fad6d 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
@@ -164,6 +165,7 @@ public class YarnFileStageTest extends TestLogger {
 		srcFiles.put("2", "Hello 2");
 		srcFiles.put("nested/3", "Hello nested/3");
 		srcFiles.put("nested/4/5", "Hello nested/4/5");
+		srcFiles.put("test.jar", "JAR Content");
 		for (Map.Entry<String, String> src : srcFiles.entrySet()) {
 			File file = new File(srcDir, src.getKey());
 			//noinspection ResultOfMethodCallIgnored
@@ -177,7 +179,7 @@ public class YarnFileStageTest extends TestLogger {
 		try {
 			List<Path> remotePaths = new ArrayList<>();
 			HashMap<String, LocalResource> localResources = new HashMap<>();
-			AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
+			final List<String> classpath = AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
 				Collections.singletonList(new File(srcPath.toUri().getPath())),
 				targetFileSystem,
 				targetDir,
@@ -185,6 +187,15 @@ public class YarnFileStageTest extends TestLogger {
 				remotePaths,
 				localResources,
 				new StringBuilder());
+
+			assertThat(
+				classpath,
+				Matchers.containsInAnyOrder(
+					srcDir.getName(),
+					srcDir.getName() + "/nested",
+					srcDir.getName() + "/nested/4",
+					srcDir.getName() + "/test.jar"));
+
 			assertEquals(srcFiles.size(), localResources.size());
 
 			Path workDir = ConverterUtils