You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/15 17:57:10 UTC
[flink] branch master updated: [FLINK-13127][yarn] Add support for
resource classloading for resources shipped using --yarnship.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 17e2747 [FLINK-13127][yarn] Add support for resource classloading for resources shipped using --yarnship.
17e2747 is described below
commit 17e2747e575cc4d9847be76b5ce9a75a7b6707f7
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Fri Jul 26 17:04:57 2019 +0200
[FLINK-13127][yarn] Add support for resource classloading for resources shipped using --yarnship.
This closes #9022.
---
.../flink/yarn/AbstractYarnClusterDescriptor.java | 99 +++++++++++-----------
.../flink/yarn/YarnClusterDescriptorTest.java | 6 +-
.../org/apache/flink/yarn/YarnFileStageTest.java | 13 ++-
3 files changed, 66 insertions(+), 52 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 5b2bc2a..23d1d3a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -83,7 +83,6 @@ import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.net.URISyntaxException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.SimpleFileVisitor;
@@ -1086,8 +1085,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
Path localSrcPath,
Map<String, LocalResource> localResources,
Path targetHomeDir,
- String relativeTargetPath) throws IOException, URISyntaxException {
-
+ String relativeTargetPath) throws IOException {
Tuple2<Path, LocalResource> resource = Utils.setupLocalResource(
fs,
appId.toString(),
@@ -1101,6 +1099,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
/**
+ * Match file name for "<tt>flink-dist*.jar</tt>" pattern.
+ *
+ * @param fileName file name to check
+ * @return true if file is a dist jar
+ */
+ private static boolean isDistJar(String fileName) {
+ return fileName.startsWith("flink-dist") && fileName.endsWith("jar");
+ }
+
+ /**
* Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
* for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately.
*
@@ -1128,64 +1136,59 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
ApplicationId appId,
List<Path> remotePaths,
Map<String, LocalResource> localResources,
- StringBuilder envShipFileList) throws IOException, URISyntaxException {
-
- final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
+ StringBuilder envShipFileList) throws IOException {
+ final List<Path> localPaths = new ArrayList<>();
+ final List<Path> relativePaths = new ArrayList<>();
for (File shipFile : shipFiles) {
if (shipFile.isDirectory()) {
// add directories to the classpath
- java.nio.file.Path shipPath = shipFile.toPath();
+ final java.nio.file.Path shipPath = shipFile.toPath();
final java.nio.file.Path parentPath = shipPath.getParent();
-
Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
@Override
- public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
- throws IOException {
- String fileName = file.getFileName().toString();
- if (!(fileName.startsWith("flink-dist") &&
- fileName.endsWith("jar"))) {
-
- java.nio.file.Path relativePath = parentPath.relativize(file);
-
- String key = relativePath.toString();
- try {
- Path remotePath = setupSingleLocalResource(
- key,
- fs,
- appId,
- new Path(file.toUri()),
- localResources,
- targetHomeDir,
- relativePath.getParent().toString());
- remotePaths.add(remotePath);
- envShipFileList.append(key).append("=")
- .append(remotePath).append(",");
-
- // add files to the classpath
- classPaths.add(key);
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
- }
-
+ public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) {
+ localPaths.add(new Path(file.toUri()));
+ relativePaths.add(new Path(parentPath.relativize(file).toString()));
return FileVisitResult.CONTINUE;
}
});
} else {
- if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
- Path shipLocalPath = new Path(shipFile.toURI());
- String key = shipFile.getName();
- Path remotePath = setupSingleLocalResource(
- key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
- remotePaths.add(remotePath);
- envShipFileList.append(key).append("=").append(remotePath).append(",");
-
- // add files to the classpath
- classPaths.add(key);
- }
+ localPaths.add(new Path(shipFile.toURI()));
+ relativePaths.add(new Path(shipFile.getName()));
}
+ }
+ final Set<String> archives = new HashSet<>();
+ final Set<String> resources = new HashSet<>();
+ for (int i = 0; i < localPaths.size(); i++) {
+ final Path localPath = localPaths.get(i);
+ final Path relativePath = relativePaths.get(i);
+ if (!isDistJar(relativePath.getName())) {
+ final String key = relativePath.toString();
+ final Path remotePath = setupSingleLocalResource(
+ key,
+ fs,
+ appId,
+ localPath,
+ localResources,
+ targetHomeDir,
+ relativePath.getParent().toString());
+ remotePaths.add(remotePath);
+ envShipFileList.append(key).append("=").append(remotePath).append(",");
+ // add files to the classpath
+ if (key.endsWith("jar")) {
+ archives.add(relativePath.toString());
+ } else {
+ resources.add(relativePath.getParent().toString());
+ }
+ }
}
+
+ // construct classpath, we always want resource directories to go first, we also sort
+ // both resources and archives in order to make classpath deterministic
+ final ArrayList<String> classPaths = new ArrayList<>();
+ resources.stream().sorted().forEach(classPaths::add);
+ archives.stream().sorted().forEach(classPaths::add);
return classPaths;
}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 842ebb6..4940f12 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -459,11 +459,11 @@ public class YarnClusterDescriptorTest extends TestLogger {
testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_PLUGINS_DIR);
}
- public void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception {
+ private void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception {
try (YarnClusterDescriptor descriptor = createYarnClusterDescriptor()) {
File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
File libFile = new File(libFolder, "libFile.jar");
- libFile.createNewFile();
+ assertTrue(libFile.createNewFile());
Set<File> effectiveShipFiles = new HashSet<>();
@@ -487,7 +487,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
}
@Test
- public void testEnvironmentEmptyPluginsShipping() throws Exception {
+ public void testEnvironmentEmptyPluginsShipping() {
try (YarnClusterDescriptor descriptor = createYarnClusterDescriptor()) {
File pluginsFolder = Paths.get(temporaryFolder.getRoot().getAbsolutePath(), "s0m3_p4th_th4t_sh0uld_n0t_3x1sts").toFile();
Set<File> effectiveShipFiles = new HashSet<>();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
index 527782c..d0fad6d 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
@@ -164,6 +165,7 @@ public class YarnFileStageTest extends TestLogger {
srcFiles.put("2", "Hello 2");
srcFiles.put("nested/3", "Hello nested/3");
srcFiles.put("nested/4/5", "Hello nested/4/5");
+ srcFiles.put("test.jar", "JAR Content");
for (Map.Entry<String, String> src : srcFiles.entrySet()) {
File file = new File(srcDir, src.getKey());
//noinspection ResultOfMethodCallIgnored
@@ -177,7 +179,7 @@ public class YarnFileStageTest extends TestLogger {
try {
List<Path> remotePaths = new ArrayList<>();
HashMap<String, LocalResource> localResources = new HashMap<>();
- AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
+ final List<String> classpath = AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
Collections.singletonList(new File(srcPath.toUri().getPath())),
targetFileSystem,
targetDir,
@@ -185,6 +187,15 @@ public class YarnFileStageTest extends TestLogger {
remotePaths,
localResources,
new StringBuilder());
+
+ assertThat(
+ classpath,
+ Matchers.containsInAnyOrder(
+ srcDir.getName(),
+ srcDir.getName() + "/nested",
+ srcDir.getName() + "/nested/4",
+ srcDir.getName() + "/test.jar"));
+
assertEquals(srcFiles.size(), localResources.size());
Path workDir = ConverterUtils