You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/07/30 12:14:41 UTC
[flink] branch master updated: [FLINK-18362][yarn] Fix mistakenly
merged commit 0e10fd5b8ee0
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a0227e2 [FLINK-18362][yarn] Fix mistakenly merged commit 0e10fd5b8ee0
a0227e2 is described below
commit a0227e20430ee9eaff59464023de2385378f71ea
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Jul 30 14:08:43 2020 +0200
[FLINK-18362][yarn] Fix mistakenly merged commit 0e10fd5b8ee0
---
.../generated/yarn_config_configuration.html | 2 +-
.../test/java/org/apache/flink/yarn/UtilsTest.java | 4 +-
.../java/org/apache/flink/yarn/YARNITCase.java | 14 +-
.../flink/yarn/testjob/YarnTestArchiveJob.java | 146 +++++++++++++++++++++
.../flink/yarn/YarnApplicationFileUploader.java | 87 ++++--------
.../apache/flink/yarn/YarnClusterDescriptor.java | 53 ++++----
.../flink/yarn/YarnLocalResourceDescriptor.java | 21 +--
.../yarn/configuration/YarnConfigOptions.java | 3 +-
.../test/java/org/apache/flink/yarn/UtilsTest.java | 1 -
.../org/apache/flink/yarn/YarnFileStageTest.java | 9 +-
.../yarn/YarnLocalResourceDescriptionTest.java | 4 +-
11 files changed, 232 insertions(+), 112 deletions(-)
diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 8126629..7173a0e 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -138,7 +138,7 @@
<td><h5>yarn.ship-archives</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List<String></td>
- <td>A semicolon-separated list of archives to be shipped to the YARN cluster. They will be un-packed when localizing.</td>
+ <td>A semicolon-separated list of archives to be shipped to the YARN cluster. These archives will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip".</td>
</tr>
<tr>
<td><h5>yarn.ship-directories</h5></td>
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 76aac0e..a2ad133 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -100,7 +101,8 @@ public class UtilsTest extends TestLogger {
new Path(root.toURI()),
0,
System.currentTimeMillis(),
- LocalResourceVisibility.APPLICATION).toString());
+ LocalResourceVisibility.APPLICATION,
+ LocalResourceType.FILE).toString());
env = Collections.unmodifiableMap(env);
File credentialFile = temporaryFolder.newFile("container_tokens");
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 88d91ca..4a9db23 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.testjob.YarnTestArchiveJob;
import org.apache.flink.yarn.testjob.YarnTestCacheJob;
import org.apache.flink.yarn.util.TestUtils;
@@ -39,7 +40,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
@@ -63,6 +66,9 @@ public class YARNITCase extends YarnTestBase {
private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
private static final int sleepIntervalInMS = 100;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
@@ -101,10 +107,16 @@ public class YARNITCase extends YarnTestBase {
final Configuration flinkConfig = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(remoteLib.toString()));
-
runTest(() -> deployPerJob(flinkConfig, getTestingJobGraph(), false));
}
+ @Test
+ public void testPerJobWithArchive() throws Exception {
+ final Configuration flinkConfig = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
+ final JobGraph archiveJobGraph = YarnTestArchiveJob.getArchiveJobGraph(tmp.newFolder(), flinkConfig);
+ runTest(() -> deployPerJob(flinkConfig, archiveJobGraph, true));
+ }
+
private void deployPerJob(Configuration configuration, JobGraph jobGraph, boolean withDist) throws Exception {
try (final YarnClusterDescriptor yarnClusterDescriptor = withDist
? createYarnClusterDescriptor(configuration)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestArchiveJob.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestArchiveJob.java
new file mode 100644
index 0000000..0febb3f
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestArchiveJob.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.testjob;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Testing job for localizing resources of LocalResourceType.ARCHIVE in per job cluster mode.
+ */
+public class YarnTestArchiveJob {
+ private static final List<String> LIST = ImmutableList.of("test1", "test2");
+
+ private static final Map<String, String> srcFiles = new HashMap<String, String>() {{
+ put("local1.txt", "Local text Content1");
+ put("local2.txt", "Local text Content2");
+ }};
+
+ private static void archiveFilesInDirectory(File directory, String target) throws IOException {
+ for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
+ Files.write(
+ Paths.get(directory.getAbsolutePath() + File.separator + entry.getKey()),
+ entry.getValue().getBytes());
+ }
+
+ try (FileOutputStream fos = new FileOutputStream(target);
+ GzipCompressorOutputStream gos = new GzipCompressorOutputStream(new BufferedOutputStream(fos));
+ TarArchiveOutputStream taros = new TarArchiveOutputStream(gos)) {
+
+ taros.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+ for (File f : directory.listFiles()) {
+ taros.putArchiveEntry(new TarArchiveEntry(f, directory.getName() + File.separator + f.getName()));
+
+ try (FileInputStream fis = new FileInputStream(f); BufferedInputStream bis = new BufferedInputStream(fis)) {
+ IOUtils.copy(bis, taros);
+ taros.closeArchiveEntry();
+ }
+ }
+ }
+
+ for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
+ Files.delete(Paths.get(directory.getAbsolutePath() + File.separator + entry.getKey()));
+ }
+
+ }
+
+ public static JobGraph getArchiveJobGraph(File testDirectory, Configuration config) throws IOException {
+
+ final String archive = testDirectory.getAbsolutePath().concat(".tar.gz");
+ archiveFilesInDirectory(testDirectory, archive);
+ config.set(YarnConfigOptions.SHIP_ARCHIVES, Collections.singletonList(archive));
+
+ final String localizedPath = testDirectory.getName().concat(".tar.gz") + File.separator + testDirectory.getName();
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ env.addSource(new SourceFunctionWithArchive<>(LIST, localizedPath, TypeInformation.of(String.class)))
+ .addSink(new DiscardingSink<>());
+
+ return env.getStreamGraph().getJobGraph();
+ }
+
+ private static class SourceFunctionWithArchive<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
+
+ private final List<T> inputDataset;
+ private final String resourcePath;
+ private final TypeInformation<T> returnType;
+
+ SourceFunctionWithArchive(List<T> inputDataset, String resourcePath, TypeInformation<T> returnType) {
+ this.inputDataset = inputDataset;
+ this.resourcePath = resourcePath;
+ this.returnType = returnType;
+ }
+
+ public void open(Configuration parameters) throws Exception {
+ for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
+ Path path = Paths.get(resourcePath + File.separator + entry.getKey());
+ String content = new String(Files.readAllBytes(path));
+ checkArgument(entry.getValue().equals(content), "The content of the unpacked file should be identical to the original file's.");
+ }
+ }
+
+ @Override
+ public void run(SourceContext<T> ctx) {
+ for (T t : inputDataset) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(t);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return this.returnType;
+ }
+ }
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
index 2317d82..9ec9800 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
@@ -140,27 +140,13 @@ class YarnApplicationFileUploader implements AutoCloseable {
IOUtils.closeQuietly(fileSystem);
}
- YarnLocalResourceDescriptor registerSingleLocalResource(
- final String key,
- final Path resourcePath,
- final String relativeDstPath,
- final boolean whetherToAddToRemotePaths,
- final boolean whetherToAddToEnvShipResourceList) throws IOException {
- return registerSingleLocalResource(
- key,
- resourcePath,
- relativeDstPath,
- whetherToAddToRemotePaths,
- whetherToAddToEnvShipResourceList,
- LocalResourceType.FILE);
- }
-
/**
* Register a single local/remote resource and adds it to <tt>localResources</tt>.
* @param key the key to add the resource under
* @param resourcePath path of the resource to be registered
* @param relativeDstPath the relative path at the target location
* (this will be prefixed by the application-specific directory)
+ * @param resourceType type of the resource, which can be one of FILE, PATTERN, or ARCHIVE
* @param whetherToAddToRemotePaths whether to add the path of local resource to <tt>remotePaths</tt>
* @param whetherToAddToEnvShipResourceList whether to add the local resource to <tt>envShipResourceList</tt>
*
@@ -170,9 +156,9 @@ class YarnApplicationFileUploader implements AutoCloseable {
final String key,
final Path resourcePath,
final String relativeDstPath,
+ final LocalResourceType resourceType,
final boolean whetherToAddToRemotePaths,
- final boolean whetherToAddToEnvShipResourceList,
- final LocalResourceType resourceType) throws IOException {
+ final boolean whetherToAddToEnvShipResourceList) throws IOException {
addToRemotePaths(whetherToAddToRemotePaths, resourcePath);
@@ -195,8 +181,7 @@ class YarnApplicationFileUploader implements AutoCloseable {
localFile.length(),
remoteFileInfo.f1,
LocalResourceVisibility.APPLICATION,
- resourceType
- );
+ resourceType);
addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
localResources.put(key, descriptor.toLocalResource());
return descriptor;
@@ -225,13 +210,6 @@ class YarnApplicationFileUploader implements AutoCloseable {
return Tuple2.of(dst, fss[0].getModificationTime());
}
- List<String> registerMultipleLocalResources(
- final Collection<Path> shipFiles,
- final String localResourcesDirectory
- ) throws IOException {
- return registerMultipleLocalResources(shipFiles, localResourcesDirectory, LocalResourceType.FILE);
- }
-
/**
* Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
* for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately. If it is
@@ -241,14 +219,15 @@ class YarnApplicationFileUploader implements AutoCloseable {
* local or remote files to register as Yarn local resources
* @param localResourcesDirectory
* the directory the localResources are uploaded to
+ * @param resourceType
+ * type of the resource, which can be one of FILE, PATTERN, or ARCHIVE
*
* @return list of class paths with the the proper resource keys from the registration
*/
List<String> registerMultipleLocalResources(
final Collection<Path> shipFiles,
final String localResourcesDirectory,
- LocalResourceType resourceType
- ) throws IOException {
+ final LocalResourceType resourceType) throws IOException {
final List<Path> localPaths = new ArrayList<>();
final List<Path> relativePaths = new ArrayList<>();
@@ -295,10 +274,9 @@ class YarnApplicationFileUploader implements AutoCloseable {
key,
localPath,
relativePath.getParent().toString(),
+ resourceType,
true,
- true,
- resourceType
- );
+ true);
if (!resourceDescriptor.alreadyRegisteredAsLocalResource()) {
if (key.endsWith("jar")) {
@@ -331,10 +309,9 @@ class YarnApplicationFileUploader implements AutoCloseable {
localJarPath.getName(),
localJarPath,
"",
+ LocalResourceType.FILE,
true,
- false,
- LocalResourceType.FILE
- );
+ false);
return flinkDist;
}
@@ -345,34 +322,26 @@ class YarnApplicationFileUploader implements AutoCloseable {
* @return list of class paths with the file name
*/
List<String> registerProvidedLocalResources() {
- return registerLocalResources(providedSharedLibs, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
- }
-
- List<String> registerLocalResources(
- Map<String, FileStatus> resources,
- LocalResourceVisibility resourceVisibility,
- LocalResourceType resourceType) {
checkNotNull(localResources);
final ArrayList<String> classPaths = new ArrayList<>();
- resources.forEach(
- (fileName, fileStatus) -> {
- final Path filePath = fileStatus.getPath();
- LOG.debug("Using remote file {} to register local resource", filePath);
-
- final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
- .fromFileStatus(fileName, fileStatus, resourceVisibility, resourceType);
- localResources.put(fileName, descriptor.toLocalResource());
- remotePaths.add(filePath);
- envShipResourceList.add(descriptor);
-
- if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) {
- classPaths.add(fileName);
- } else if (isFlinkDistJar(filePath.getName())) {
- flinkDist = descriptor;
-
- }
- });
+ providedSharedLibs.forEach(
+ (fileName, fileStatus) -> {
+ final Path filePath = fileStatus.getPath();
+ LOG.debug("Using remote file {} to register local resource", filePath);
+
+ final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
+ .fromFileStatus(fileName, fileStatus, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
+ localResources.put(fileName, descriptor.toLocalResource());
+ remotePaths.add(filePath);
+ envShipResourceList.add(descriptor);
+
+ if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) {
+ classPaths.add(fileName);
+ } else if (isFlinkDistJar(filePath.getName())) {
+ flinkDist = descriptor;
+ }
+ });
return classPaths;
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f359504..6d00fba 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
@@ -172,8 +173,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
- decodeDirsToShipToCluster(flinkConfiguration).ifPresent(this::addShipFiles);
- decodeArchivesToShipToCluster(flinkConfiguration).ifPresent(this::addShipArchives);
+ decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_DIRECTORIES).ifPresent(this::addShipFiles);
+ decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_ARCHIVES).ifPresent(this::addShipArchives);
this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
@@ -184,17 +185,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
this.zookeeperNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
}
- private Optional<List<File>> decodeDirsToShipToCluster(final Configuration configuration) {
+ private Optional<List<File>> decodeFilesToShipToCluster(
+ final Configuration configuration,
+ final ConfigOption<List<String>> configOption) {
checkNotNull(configuration);
+ checkNotNull(configOption);
- final List<File> files = ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
- return files.isEmpty() ? Optional.empty() : Optional.of(files);
- }
-
- private Optional<List<File>> decodeArchivesToShipToCluster(final Configuration configuration) {
- checkNotNull(configuration);
-
- final List<File> files = ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_ARCHIVES, File::new);
+ final List<File> files = ConfigUtils.decodeListFromConfig(configuration, configOption, File::new);
return files.isEmpty() ? Optional.empty() : Optional.of(files);
}
@@ -263,7 +260,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
* Adds the given files to the list of files to ship.
*
* <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
- * {@link YarnApplicationFileUploader#registerMultipleLocalResources(Collection, String)}
+ * {@link YarnApplicationFileUploader#registerMultipleLocalResources(Collection, String, LocalResourceType)}
* since we upload the Flink uber jar ourselves and do not need to deploy it multiple times.
*
* @param shipFiles files to ship
@@ -278,12 +275,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
this.shipFiles.addAll(shipFiles);
}
- /**
- * Adds the given files to the list of archives to ship.
- **
- * @param shipArchives archives to ship
- */
- public void addShipArchives(List<File> shipArchives) {
+ private void addShipArchives(List<File> shipArchives) {
checkArgument(isArchiveOnlyIncludedInShipArchiveFiles(shipArchives), "Non-archive files are included.");
this.shipArchives.addAll(shipArchives);
}
@@ -293,7 +285,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
.filter(File::isFile)
.map(File::getName)
.map(String::toLowerCase)
- .allMatch(name -> name.endsWith(".tar.gz") ||
+ .allMatch(name ->
+ name.endsWith(".tar.gz") ||
name.endsWith(".tar") ||
name.endsWith(".tgz") ||
name.endsWith(".dst") ||
@@ -797,7 +790,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies = fileUploader.registerMultipleLocalResources(
systemShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
- Path.CUR_DIR);
+ Path.CUR_DIR,
+ LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
@@ -807,20 +801,24 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
- Path.CUR_DIR);
+ Path.CUR_DIR,
+ LocalResourceType.FILE);
}
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
- Path.CUR_DIR, LocalResourceType.ARCHIVE);
+ Path.CUR_DIR,
+ LocalResourceType.ARCHIVE);
}
// Upload and register user jars
final List<String> userClassPaths = fileUploader.registerMultipleLocalResources(
userJarFiles,
- userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ?
- ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR : Path.CUR_DIR);
+ userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
+ ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
+ : Path.CUR_DIR,
+ LocalResourceType.FILE);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
@@ -863,6 +861,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
+ LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
@@ -886,8 +885,9 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
- new Path(tmpConfigurationFile.toURI()),
+ new Path(tmpConfigurationFile.getAbsolutePath()),
"",
+ LocalResourceType.FILE,
true,
true);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
@@ -918,6 +918,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
Utils.YARN_SITE_FILE_NAME,
yarnSitePath,
"",
+ LocalResourceType.FILE,
false,
false).getPath();
@@ -930,6 +931,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
Utils.KRB5_FILE_NAME,
krb5ConfPath,
"",
+ LocalResourceType.FILE,
false,
false).getPath();
hasKrb5 = true;
@@ -949,6 +951,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
localizedKeytabPath,
new Path(keytab),
"",
+ LocalResourceType.FILE,
false,
false).getPath();
} else {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
index 21aa32a..b091fa9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
@@ -56,24 +56,13 @@ class YarnLocalResourceDescriptor {
long resourceSize,
long modificationTime,
LocalResourceVisibility visibility,
- LocalResourceType resourceType
- ) {
+ LocalResourceType resourceType) {
this.resourceKey = checkNotNull(resourceKey);
this.path = checkNotNull(path);
this.size = resourceSize;
this.modificationTime = modificationTime;
this.visibility = checkNotNull(visibility);
- this.resourceType = resourceType;
- }
-
- YarnLocalResourceDescriptor(
- String resourceKey,
- Path path,
- long resourceSize,
- long modificationTime,
- LocalResourceVisibility visibility
- ) {
- this(resourceKey, path, resourceSize, modificationTime, visibility, LocalResourceType.FILE);
+ this.resourceType = checkNotNull(resourceType);
}
boolean alreadyRegisteredAsLocalResource() {
@@ -114,8 +103,7 @@ class YarnLocalResourceDescriptor {
Long.parseLong(m.group(3)),
Long.parseLong(m.group(4)),
LocalResourceVisibility.valueOf(m.group(5)),
- LocalResourceType.valueOf(m.group(6))
- );
+ LocalResourceType.valueOf(m.group(6)));
} else {
throw new FlinkException("Error to parse YarnLocalResourceDescriptor from " + desc);
}
@@ -135,8 +123,7 @@ class YarnLocalResourceDescriptor {
fileStatus.getLen(),
fileStatus.getModificationTime(),
visibility,
- resourceType
- );
+ resourceType);
}
@Override
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 1bf2af6..6d92996 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -215,7 +215,8 @@ public class YarnConfigOptions {
.asList()
.noDefaultValue()
.withDescription("A semicolon-separated list of archives to be shipped to the YARN cluster." +
- " They will be un-packed when localizing.");
+ " These archives will be un-packed when localizing and they can be any of the following types: " +
+ "\".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\".");
public static final ConfigOption<String> FLINK_DIST_JAR =
key("yarn.flink-dist-jar")
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index c9fd15c..a5ae14c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -58,5 +58,4 @@ public class UtilsTest extends TestLogger {
assertThat(files.count(), equalTo(0L));
}
}
-
}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
index ba23fc7..774130f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.AfterClass;
import org.junit.Assume;
@@ -40,7 +41,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
- import java.io.File;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -218,7 +219,8 @@ public class YarnFileStageTest extends TestLogger {
final List<String> classpath = uploader.registerMultipleLocalResources(
Collections.singletonList(srcPath),
- localResourceDirectory);
+ localResourceDirectory,
+ LocalResourceType.FILE);
final Path basePath = new Path(localResourceDirectory, srcPath.getName());
final Path nestedPath = new Path(basePath, "nested");
@@ -283,7 +285,8 @@ public class YarnFileStageTest extends TestLogger {
final List<String> classpath = uploader.registerMultipleLocalResources(
Collections.singletonList(new Path(srcDir.getAbsolutePath(), localFile)),
- localResourceDirectory);
+ localResourceDirectory,
+ LocalResourceType.FILE);
assertThat(classpath, containsInAnyOrder(new Path(localResourceDirectory, localFile).toString()));
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
index d765cb7..071d0bd 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
@@ -48,8 +48,7 @@ public class YarnLocalResourceDescriptionTest extends TestLogger {
size,
ts,
LocalResourceVisibility.PUBLIC,
- LocalResourceType.FILE
- );
+ LocalResourceType.FILE);
final String desc = localResourceDesc.toString();
YarnLocalResourceDescriptor newLocalResourceDesc = YarnLocalResourceDescriptor.fromString(desc);
@@ -59,7 +58,6 @@ public class YarnLocalResourceDescriptionTest extends TestLogger {
assertThat(newLocalResourceDesc.getModificationTime(), is(ts));
assertThat(newLocalResourceDesc.getVisibility(), is(LocalResourceVisibility.PUBLIC));
assertThat(newLocalResourceDesc.getResourceType(), is(LocalResourceType.FILE));
-
}
@Test