You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/07/30 12:14:41 UTC

[flink] branch master updated: [FLINK-18362][yarn] Fix mistakenly merged commit 0e10fd5b8ee0

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a0227e2  [FLINK-18362][yarn] Fix mistakenly merged commit 0e10fd5b8ee0
a0227e2 is described below

commit a0227e20430ee9eaff59464023de2385378f71ea
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Jul 30 14:08:43 2020 +0200

    [FLINK-18362][yarn] Fix mistakenly merged commit 0e10fd5b8ee0
---
 .../generated/yarn_config_configuration.html       |   2 +-
 .../test/java/org/apache/flink/yarn/UtilsTest.java |   4 +-
 .../java/org/apache/flink/yarn/YARNITCase.java     |  14 +-
 .../flink/yarn/testjob/YarnTestArchiveJob.java     | 146 +++++++++++++++++++++
 .../flink/yarn/YarnApplicationFileUploader.java    |  87 ++++--------
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  53 ++++----
 .../flink/yarn/YarnLocalResourceDescriptor.java    |  21 +--
 .../yarn/configuration/YarnConfigOptions.java      |   3 +-
 .../test/java/org/apache/flink/yarn/UtilsTest.java |   1 -
 .../org/apache/flink/yarn/YarnFileStageTest.java   |   9 +-
 .../yarn/YarnLocalResourceDescriptionTest.java     |   4 +-
 11 files changed, 232 insertions(+), 112 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 8126629..7173a0e 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -138,7 +138,7 @@
             <td><h5>yarn.ship-archives</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>List&lt;String&gt;</td>
-            <td>A semicolon-separated list of archives to be shipped to the YARN cluster. They will be un-packed when localizing.</td>
+            <td>A semicolon-separated list of archives to be shipped to the YARN cluster. These archives will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip".</td>
         </tr>
         <tr>
             <td><h5>yarn.ship-directories</h5></td>
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 76aac0e..a2ad133 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -100,7 +101,8 @@ public class UtilsTest extends TestLogger {
 			new Path(root.toURI()),
 			0,
 			System.currentTimeMillis(),
-			LocalResourceVisibility.APPLICATION).toString());
+			LocalResourceVisibility.APPLICATION,
+			LocalResourceType.FILE).toString());
 		env = Collections.unmodifiableMap(env);
 
 		File credentialFile = temporaryFolder.newFile("container_tokens");
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 88d91ca..4a9db23 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.testjob.YarnTestArchiveJob;
 import org.apache.flink.yarn.testjob.YarnTestCacheJob;
 import org.apache.flink.yarn.util.TestUtils;
 
@@ -39,7 +40,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
@@ -63,6 +66,9 @@ public class YARNITCase extends YarnTestBase {
 	private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
 	private static final int sleepIntervalInMS = 100;
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@BeforeClass
 	public static void setup() {
 		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
@@ -101,10 +107,16 @@ public class YARNITCase extends YarnTestBase {
 
 		final Configuration flinkConfig = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
 		flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(remoteLib.toString()));
-
 		runTest(() -> deployPerJob(flinkConfig, getTestingJobGraph(), false));
 	}
 
+	@Test
+	public void testPerJobWithArchive() throws Exception {
+		final Configuration flinkConfig = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
+		final JobGraph archiveJobGraph = YarnTestArchiveJob.getArchiveJobGraph(tmp.newFolder(), flinkConfig);
+		runTest(() -> deployPerJob(flinkConfig, archiveJobGraph, true));
+	}
+
 	private void deployPerJob(Configuration configuration, JobGraph jobGraph, boolean withDist) throws Exception {
 		try (final YarnClusterDescriptor yarnClusterDescriptor = withDist
 				? createYarnClusterDescriptor(configuration)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestArchiveJob.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestArchiveJob.java
new file mode 100644
index 0000000..0febb3f
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestArchiveJob.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.testjob;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Testing job for localizing resources of LocalResourceType.ARCHIVE in per job cluster mode.
+ */
+public class YarnTestArchiveJob {
+	private static final List<String> LIST = ImmutableList.of("test1", "test2");
+
+	private static final Map<String, String> srcFiles = new HashMap<String, String>() {{
+			put("local1.txt", "Local text Content1");
+			put("local2.txt", "Local text Content2");
+		}};
+
+	private static void archiveFilesInDirectory(File directory, String target) throws IOException {
+		for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
+			Files.write(
+					Paths.get(directory.getAbsolutePath() + File.separator + entry.getKey()),
+					entry.getValue().getBytes());
+		}
+
+		try (FileOutputStream fos = new FileOutputStream(target);
+				GzipCompressorOutputStream gos = new GzipCompressorOutputStream(new BufferedOutputStream(fos));
+				TarArchiveOutputStream taros = new TarArchiveOutputStream(gos)) {
+
+			taros.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+			for (File f : directory.listFiles()) {
+				taros.putArchiveEntry(new TarArchiveEntry(f, directory.getName() + File.separator + f.getName()));
+
+				try (FileInputStream fis = new FileInputStream(f); BufferedInputStream bis = new BufferedInputStream(fis)) {
+					IOUtils.copy(bis, taros);
+					taros.closeArchiveEntry();
+				}
+			}
+		}
+
+		for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
+			Files.delete(Paths.get(directory.getAbsolutePath() + File.separator + entry.getKey()));
+		}
+
+	}
+
+	public static JobGraph getArchiveJobGraph(File testDirectory, Configuration config) throws IOException {
+
+		final String archive = testDirectory.getAbsolutePath().concat(".tar.gz");
+		archiveFilesInDirectory(testDirectory, archive);
+		config.set(YarnConfigOptions.SHIP_ARCHIVES, Collections.singletonList(archive));
+
+		final String localizedPath = testDirectory.getName().concat(".tar.gz") + File.separator + testDirectory.getName();
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		env.addSource(new SourceFunctionWithArchive<>(LIST, localizedPath, TypeInformation.of(String.class)))
+			.addSink(new DiscardingSink<>());
+
+		return env.getStreamGraph().getJobGraph();
+	}
+
+	private static class SourceFunctionWithArchive<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
+
+		private final List<T> inputDataset;
+		private final String resourcePath;
+		private final TypeInformation<T> returnType;
+
+		SourceFunctionWithArchive(List<T> inputDataset, String resourcePath, TypeInformation<T> returnType) {
+			this.inputDataset = inputDataset;
+			this.resourcePath = resourcePath;
+			this.returnType = returnType;
+		}
+
+		public void open(Configuration parameters) throws Exception {
+			for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
+				Path path = Paths.get(resourcePath + File.separator + entry.getKey());
+				String content = new String(Files.readAllBytes(path));
+				checkArgument(entry.getValue().equals(content), "The content of the unpacked file should be identical to the original file's.");
+			}
+		}
+
+		@Override
+		public void run(SourceContext<T> ctx) {
+			for (T t : inputDataset) {
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(t);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {}
+
+		@Override
+		public TypeInformation<T> getProducedType() {
+			return this.returnType;
+		}
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
index 2317d82..9ec9800 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
@@ -140,27 +140,13 @@ class YarnApplicationFileUploader implements AutoCloseable {
 		IOUtils.closeQuietly(fileSystem);
 	}
 
-	YarnLocalResourceDescriptor registerSingleLocalResource(
-		final String key,
-		final Path resourcePath,
-		final String relativeDstPath,
-		final boolean whetherToAddToRemotePaths,
-		final boolean whetherToAddToEnvShipResourceList) throws IOException {
-		return registerSingleLocalResource(
-			key,
-			resourcePath,
-			relativeDstPath,
-			whetherToAddToRemotePaths,
-			whetherToAddToEnvShipResourceList,
-			LocalResourceType.FILE);
-	}
-
 	/**
 	 * Register a single local/remote resource and adds it to <tt>localResources</tt>.
 	 * @param key the key to add the resource under
 	 * @param resourcePath path of the resource to be registered
 	 * @param relativeDstPath the relative path at the target location
 	 *                              (this will be prefixed by the application-specific directory)
+	 * @param resourceType type of the resource, which can be one of FILE, PATTERN, or ARCHIVE
 	 * @param whetherToAddToRemotePaths whether to add the path of local resource to <tt>remotePaths</tt>
 	 * @param whetherToAddToEnvShipResourceList whether to add the local resource to <tt>envShipResourceList</tt>
 	 *
@@ -170,9 +156,9 @@ class YarnApplicationFileUploader implements AutoCloseable {
 			final String key,
 			final Path resourcePath,
 			final String relativeDstPath,
+			final LocalResourceType resourceType,
 			final boolean whetherToAddToRemotePaths,
-			final boolean whetherToAddToEnvShipResourceList,
-			final LocalResourceType resourceType) throws IOException {
+			final boolean whetherToAddToEnvShipResourceList) throws IOException {
 
 		addToRemotePaths(whetherToAddToRemotePaths, resourcePath);
 
@@ -195,8 +181,7 @@ class YarnApplicationFileUploader implements AutoCloseable {
 			localFile.length(),
 			remoteFileInfo.f1,
 			LocalResourceVisibility.APPLICATION,
-			resourceType
-		);
+			resourceType);
 		addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
 		localResources.put(key, descriptor.toLocalResource());
 		return descriptor;
@@ -225,13 +210,6 @@ class YarnApplicationFileUploader implements AutoCloseable {
 		return Tuple2.of(dst, fss[0].getModificationTime());
 	}
 
-	List<String> registerMultipleLocalResources(
-		final Collection<Path> shipFiles,
-		final String localResourcesDirectory
-	) throws IOException {
-		return registerMultipleLocalResources(shipFiles, localResourcesDirectory, LocalResourceType.FILE);
-	}
-
 	/**
 	 * Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
 	 * for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately. If it is
@@ -241,14 +219,15 @@ class YarnApplicationFileUploader implements AutoCloseable {
 	 * 		local or remote files to register as Yarn local resources
 	 * @param localResourcesDirectory
 	 *		the directory the localResources are uploaded to
+	 * @param resourceType
+	 *      type of the resource, which can be one of FILE, PATTERN, or ARCHIVE
 	 *
 	 * @return list of class paths with the the proper resource keys from the registration
 	 */
 	List<String> registerMultipleLocalResources(
 			final Collection<Path> shipFiles,
 			final String localResourcesDirectory,
-			LocalResourceType resourceType
-			) throws IOException {
+			final LocalResourceType resourceType) throws IOException {
 
 		final List<Path> localPaths = new ArrayList<>();
 		final List<Path> relativePaths = new ArrayList<>();
@@ -295,10 +274,9 @@ class YarnApplicationFileUploader implements AutoCloseable {
 						key,
 						localPath,
 						relativePath.getParent().toString(),
+						resourceType,
 						true,
-						true,
-						resourceType
-					);
+						true);
 
 				if (!resourceDescriptor.alreadyRegisteredAsLocalResource()) {
 					if (key.endsWith("jar")) {
@@ -331,10 +309,9 @@ class YarnApplicationFileUploader implements AutoCloseable {
 				localJarPath.getName(),
 				localJarPath,
 				"",
+				LocalResourceType.FILE,
 				true,
-				false,
-				LocalResourceType.FILE
-		);
+				false);
 		return flinkDist;
 	}
 
@@ -345,34 +322,26 @@ class YarnApplicationFileUploader implements AutoCloseable {
 	 * @return list of class paths with the file name
 	 */
 	List<String> registerProvidedLocalResources() {
-		return registerLocalResources(providedSharedLibs, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
-	}
-
-	List<String> registerLocalResources(
-			Map<String, FileStatus> resources,
-			LocalResourceVisibility resourceVisibility,
-			LocalResourceType resourceType) {
 		checkNotNull(localResources);
 
 		final ArrayList<String> classPaths = new ArrayList<>();
-		resources.forEach(
-			(fileName, fileStatus) -> {
-				final Path filePath = fileStatus.getPath();
-				LOG.debug("Using remote file {} to register local resource", filePath);
-
-				final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
-					.fromFileStatus(fileName, fileStatus, resourceVisibility, resourceType);
-				localResources.put(fileName, descriptor.toLocalResource());
-				remotePaths.add(filePath);
-				envShipResourceList.add(descriptor);
-
-				if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) {
-					classPaths.add(fileName);
-				} else if (isFlinkDistJar(filePath.getName())) {
-					flinkDist = descriptor;
-
-				}
-			});
+		providedSharedLibs.forEach(
+				(fileName, fileStatus) -> {
+					final Path filePath = fileStatus.getPath();
+					LOG.debug("Using remote file {} to register local resource", filePath);
+
+					final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
+							.fromFileStatus(fileName, fileStatus, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
+					localResources.put(fileName, descriptor.toLocalResource());
+					remotePaths.add(filePath);
+					envShipResourceList.add(descriptor);
+
+					if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) {
+						classPaths.add(fileName);
+					} else if (isFlinkDistJar(filePath.getName())) {
+						flinkDist = descriptor;
+					}
+				});
 		return classPaths;
 	}
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f359504..6d00fba 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -172,8 +173,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
 		getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
-		decodeDirsToShipToCluster(flinkConfiguration).ifPresent(this::addShipFiles);
-		decodeArchivesToShipToCluster(flinkConfiguration).ifPresent(this::addShipArchives);
+		decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_DIRECTORIES).ifPresent(this::addShipFiles);
+		decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_ARCHIVES).ifPresent(this::addShipArchives);
 
 		this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
 		this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
@@ -184,17 +185,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		this.zookeeperNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
 	}
 
-	private Optional<List<File>> decodeDirsToShipToCluster(final Configuration configuration) {
+	private Optional<List<File>> decodeFilesToShipToCluster(
+			final Configuration configuration,
+			final ConfigOption<List<String>> configOption) {
 		checkNotNull(configuration);
+		checkNotNull(configOption);
 
-		final List<File> files = ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
-		return files.isEmpty() ? Optional.empty() : Optional.of(files);
-	}
-
-	private Optional<List<File>> decodeArchivesToShipToCluster(final Configuration configuration) {
-		checkNotNull(configuration);
-
-		final List<File> files = ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_ARCHIVES, File::new);
+		final List<File> files = ConfigUtils.decodeListFromConfig(configuration, configOption, File::new);
 		return files.isEmpty() ? Optional.empty() : Optional.of(files);
 	}
 
@@ -263,7 +260,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 	 * Adds the given files to the list of files to ship.
 	 *
 	 * <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
-	 * {@link YarnApplicationFileUploader#registerMultipleLocalResources(Collection, String)}
+	 * {@link YarnApplicationFileUploader#registerMultipleLocalResources(Collection, String, LocalResourceType)}
 	 * since we upload the Flink uber jar ourselves and do not need to deploy it multiple times.
 	 *
 	 * @param shipFiles files to ship
@@ -278,12 +275,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		this.shipFiles.addAll(shipFiles);
 	}
 
-	/**
-	 * Adds the given files to the list of archives to ship.
-	 **
-	 * @param shipArchives archives to ship
-	 */
-	public void addShipArchives(List<File> shipArchives) {
+	private void addShipArchives(List<File> shipArchives) {
 		checkArgument(isArchiveOnlyIncludedInShipArchiveFiles(shipArchives), "Non-archive files are included.");
 		this.shipArchives.addAll(shipArchives);
 	}
@@ -293,7 +285,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 			.filter(File::isFile)
 			.map(File::getName)
 			.map(String::toLowerCase)
-			.allMatch(name -> name.endsWith(".tar.gz") ||
+			.allMatch(name ->
+				name.endsWith(".tar.gz") ||
 				name.endsWith(".tar") ||
 				name.endsWith(".tgz") ||
 				name.endsWith(".dst") ||
@@ -797,7 +790,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
 		final List<String> uploadedDependencies = fileUploader.registerMultipleLocalResources(
 			systemShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
-			Path.CUR_DIR);
+			Path.CUR_DIR,
+			LocalResourceType.FILE);
 		systemClassPaths.addAll(uploadedDependencies);
 
 		// upload and register ship-only files
@@ -807,20 +801,24 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 			addPluginsFoldersToShipFiles(shipOnlyFiles);
 			fileUploader.registerMultipleLocalResources(
 					shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
-					Path.CUR_DIR);
+					Path.CUR_DIR,
+					LocalResourceType.FILE);
 		}
 
 		if (!shipArchives.isEmpty()) {
 			fileUploader.registerMultipleLocalResources(
 				shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
-				Path.CUR_DIR, LocalResourceType.ARCHIVE);
+				Path.CUR_DIR,
+				LocalResourceType.ARCHIVE);
 		}
 
 		// Upload and register user jars
 		final List<String> userClassPaths = fileUploader.registerMultipleLocalResources(
 			userJarFiles,
-			userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ?
-				ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR : Path.CUR_DIR);
+			userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
+					? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
+					: Path.CUR_DIR,
+			LocalResourceType.FILE);
 
 		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
 			systemClassPaths.addAll(userClassPaths);
@@ -863,6 +861,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 					jobGraphFilename,
 					new Path(tmpJobGraphFile.toURI()),
 					"",
+					LocalResourceType.FILE,
 					true,
 					false);
 				classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
@@ -886,8 +885,9 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 			String flinkConfigKey = "flink-conf.yaml";
 			fileUploader.registerSingleLocalResource(
 				flinkConfigKey,
-				new Path(tmpConfigurationFile.toURI()),
+				new Path(tmpConfigurationFile.getAbsolutePath()),
 				"",
+				LocalResourceType.FILE,
 				true,
 				true);
 			classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
@@ -918,6 +918,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 				Utils.YARN_SITE_FILE_NAME,
 				yarnSitePath,
 				"",
+				LocalResourceType.FILE,
 				false,
 				false).getPath();
 
@@ -930,6 +931,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 					Utils.KRB5_FILE_NAME,
 					krb5ConfPath,
 					"",
+					LocalResourceType.FILE,
 					false,
 					false).getPath();
 				hasKrb5 = true;
@@ -949,6 +951,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 					localizedKeytabPath,
 					new Path(keytab),
 					"",
+					LocalResourceType.FILE,
 					false,
 					false).getPath();
 			} else {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
index 21aa32a..b091fa9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java
@@ -56,24 +56,13 @@ class YarnLocalResourceDescriptor {
 			long resourceSize,
 			long modificationTime,
 			LocalResourceVisibility visibility,
-			LocalResourceType resourceType
-			) {
+			LocalResourceType resourceType) {
 		this.resourceKey = checkNotNull(resourceKey);
 		this.path = checkNotNull(path);
 		this.size = resourceSize;
 		this.modificationTime = modificationTime;
 		this.visibility = checkNotNull(visibility);
-		this.resourceType = resourceType;
-	}
-
-	YarnLocalResourceDescriptor(
-		String resourceKey,
-		Path path,
-		long resourceSize,
-		long modificationTime,
-		LocalResourceVisibility visibility
-	) {
-		this(resourceKey, path, resourceSize, modificationTime, visibility, LocalResourceType.FILE);
+		this.resourceType = checkNotNull(resourceType);
 	}
 
 	boolean alreadyRegisteredAsLocalResource() {
@@ -114,8 +103,7 @@ class YarnLocalResourceDescriptor {
 				Long.parseLong(m.group(3)),
 				Long.parseLong(m.group(4)),
 				LocalResourceVisibility.valueOf(m.group(5)),
-				LocalResourceType.valueOf(m.group(6))
-			);
+				LocalResourceType.valueOf(m.group(6)));
 		} else {
 			throw new FlinkException("Error to parse YarnLocalResourceDescriptor from " + desc);
 		}
@@ -135,8 +123,7 @@ class YarnLocalResourceDescriptor {
 				fileStatus.getLen(),
 				fileStatus.getModificationTime(),
 				visibility,
-				resourceType
-			);
+				resourceType);
 	}
 
 	@Override
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 1bf2af6..6d92996 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -215,7 +215,8 @@ public class YarnConfigOptions {
 				.asList()
 				.noDefaultValue()
 				.withDescription("A semicolon-separated list of archives to be shipped to the YARN cluster." +
-						" They will be un-packed when localizing.");
+						" These archives will be un-packed when localizing and they can be any of the following types: " +
+						"\".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\".");
 
 	public static final ConfigOption<String> FLINK_DIST_JAR =
 			key("yarn.flink-dist-jar")
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index c9fd15c..a5ae14c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -58,5 +58,4 @@ public class UtilsTest extends TestLogger {
 			assertThat(files.count(), equalTo(0L));
 		}
 	}
-
 }
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
index ba23fc7..774130f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.AfterClass;
 import org.junit.Assume;
@@ -40,7 +41,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-					import java.io.File;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -218,7 +219,8 @@ public class YarnFileStageTest extends TestLogger {
 
 			final List<String> classpath = uploader.registerMultipleLocalResources(
 				Collections.singletonList(srcPath),
-				localResourceDirectory);
+				localResourceDirectory,
+				LocalResourceType.FILE);
 
 			final Path basePath = new Path(localResourceDirectory, srcPath.getName());
 			final Path nestedPath = new Path(basePath, "nested");
@@ -283,7 +285,8 @@ public class YarnFileStageTest extends TestLogger {
 
 			final List<String> classpath = uploader.registerMultipleLocalResources(
 				Collections.singletonList(new Path(srcDir.getAbsolutePath(), localFile)),
-				localResourceDirectory);
+				localResourceDirectory,
+				LocalResourceType.FILE);
 
 			assertThat(classpath, containsInAnyOrder(new Path(localResourceDirectory, localFile).toString()));
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
index d765cb7..071d0bd 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnLocalResourceDescriptionTest.java
@@ -48,8 +48,7 @@ public class YarnLocalResourceDescriptionTest extends TestLogger {
 			size,
 			ts,
 			LocalResourceVisibility.PUBLIC,
-			LocalResourceType.FILE
-		);
+			LocalResourceType.FILE);
 
 		final String desc = localResourceDesc.toString();
 		YarnLocalResourceDescriptor newLocalResourceDesc = YarnLocalResourceDescriptor.fromString(desc);
@@ -59,7 +58,6 @@ public class YarnLocalResourceDescriptionTest extends TestLogger {
 		assertThat(newLocalResourceDesc.getModificationTime(), is(ts));
 		assertThat(newLocalResourceDesc.getVisibility(), is(LocalResourceVisibility.PUBLIC));
 		assertThat(newLocalResourceDesc.getResourceType(), is(LocalResourceType.FILE));
-
 	}
 
 	@Test