You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2018/01/26 19:30:16 UTC
hadoop git commit: YARN-2185. Use pipes when localizing archives.
Contributed by Miklos Szegedi
Repository: hadoop
Updated Branches:
refs/heads/trunk f2fa736f0 -> 1b0f265db
YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b0f265d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b0f265d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b0f265d
Branch: refs/heads/trunk
Commit: 1b0f265db1a5bfccf1d870912237ea9618bd9c34
Parents: f2fa736
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Jan 26 13:25:20 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Jan 26 13:25:20 2018 -0600
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/FileUtil.java | 251 ++++++++++++++++++-
.../java/org/apache/hadoop/util/RunJar.java | 65 +++++
.../org/apache/hadoop/yarn/util/FSDownload.java | 215 ++++++++++------
.../apache/hadoop/yarn/util/TestFSDownload.java | 30 ++-
4 files changed, 462 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index 4d971aa..bf9b146 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -20,27 +20,35 @@ package org.apache.hadoop.fs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.charset.Charset;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -75,6 +83,11 @@ public class FileUtil {
public static final int SYMLINK_NO_PRIVILEGE = 2;
/**
+ * Buffer size for copy the content of compressed file to new file.
+ */
+ private static final int BUFFER_SIZE = 8_192;
+
+ /**
* convert an array of FileStatus to an array of Path
*
* @param stats
@@ -526,6 +539,22 @@ public class FileUtil {
}
/**
+ * Convert a os-native filename to a path that works for the shell
+ * and avoids script injection attacks.
+ * @param file The filename to convert
+ * @return The unix pathname
+ * @throws IOException on windows, there can be problems with the subprocess
+ */
+ public static String makeSecureShellPath(File file) throws IOException {
+ if (Shell.WINDOWS) {
+ // Currently it is never called, but it might be helpful in the future.
+ throw new UnsupportedOperationException("Not implemented for Windows");
+ } else {
+ return makeShellPath(file, false).replace("'", "'\\''");
+ }
+ }
+
+ /**
* Convert a os-native filename to a path that works for the shell.
* @param file The filename to convert
* @param makeCanonicalPath
@@ -576,11 +605,48 @@ public class FileUtil {
}
/**
- * Given a File input it will unzip the file in a the unzip directory
+ * Given a stream input it will unzip the it in the unzip directory.
+ * passed as the second parameter
+ * @param inputStream The zip file as input
+ * @param toDir The unzip directory where to unzip the zip file.
+ * @throws IOException an exception occurred
+ */
+ public static void unZip(InputStream inputStream, File toDir)
+ throws IOException {
+ try (ZipInputStream zip = new ZipInputStream(inputStream)) {
+ int numOfFailedLastModifiedSet = 0;
+ for(ZipEntry entry = zip.getNextEntry();
+ entry != null;
+ entry = zip.getNextEntry()) {
+ if (!entry.isDirectory()) {
+ File file = new File(toDir, entry.getName());
+ File parent = file.getParentFile();
+ if (!parent.mkdirs() &&
+ !parent.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " +
+ parent.getAbsolutePath());
+ }
+ try (OutputStream out = new FileOutputStream(file)) {
+ IOUtils.copyBytes(zip, out, BUFFER_SIZE);
+ }
+ if (!file.setLastModified(entry.getTime())) {
+ numOfFailedLastModifiedSet++;
+ }
+ }
+ }
+ if (numOfFailedLastModifiedSet > 0) {
+ LOG.warn("Could not set last modfied time for {} file(s)",
+ numOfFailedLastModifiedSet);
+ }
+ }
+ }
+
+ /**
+ * Given a File input it will unzip it in the unzip directory.
* passed as the second parameter
* @param inFile The zip file as input
* @param unzipDir The unzip directory where to unzip the zip file.
- * @throws IOException
+ * @throws IOException An I/O exception has occurred
*/
public static void unZip(File inFile, File unzipDir) throws IOException {
Enumeration<? extends ZipEntry> entries;
@@ -621,6 +687,138 @@ public class FileUtil {
}
/**
+ * Run a command and send the contents of an input stream to it.
+ * @param inputStream Input stream to forward to the shell command
+ * @param command shell command to run
+ * @throws IOException read or write failed
+ * @throws InterruptedException command interrupted
+ * @throws ExecutionException task submit failed
+ */
+ private static void runCommandOnStream(
+ InputStream inputStream, String command)
+ throws IOException, InterruptedException, ExecutionException {
+ ExecutorService executor = null;
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(
+ Shell.WINDOWS ? "cmd" : "bash",
+ Shell.WINDOWS ? "/c" : "-c",
+ command);
+ Process process = builder.start();
+ int exitCode;
+ try {
+ // Consume stdout and stderr, to avoid blocking the command
+ executor = Executors.newFixedThreadPool(2);
+ Future output = executor.submit(() -> {
+ try {
+ // Read until the output stream receives an EOF and closed.
+ if (LOG.isDebugEnabled()) {
+ // Log directly to avoid out of memory errors
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(process.getInputStream(),
+ Charset.forName("UTF-8")))) {
+ String line;
+ while((line = reader.readLine()) != null) {
+ LOG.debug(line);
+ }
+ }
+ } else {
+ org.apache.commons.io.IOUtils.copy(
+ process.getInputStream(),
+ new IOUtils.NullOutputStream());
+ }
+ } catch (IOException e) {
+ LOG.debug(e.getMessage());
+ }
+ });
+ Future error = executor.submit(() -> {
+ try {
+ // Read until the error stream receives an EOF and closed.
+ if (LOG.isDebugEnabled()) {
+ // Log directly to avoid out of memory errors
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(process.getErrorStream(),
+ Charset.forName("UTF-8")))) {
+ String line;
+ while((line = reader.readLine()) != null) {
+ LOG.debug(line);
+ }
+ }
+ } else {
+ org.apache.commons.io.IOUtils.copy(
+ process.getErrorStream(),
+ new IOUtils.NullOutputStream());
+ }
+ } catch (IOException e) {
+ LOG.debug(e.getMessage());
+ }
+ });
+
+ // Pass the input stream to the command to process
+ try {
+ org.apache.commons.io.IOUtils.copy(
+ inputStream, process.getOutputStream());
+ } finally {
+ process.getOutputStream().close();
+ }
+
+ // Wait for both stdout and stderr futures to finish
+ error.get();
+ output.get();
+ } finally {
+ // Clean up the threads
+ if (executor != null) {
+ executor.shutdown();
+ }
+ // Wait to avoid leaking the child process
+ exitCode = process.waitFor();
+ }
+
+ if (exitCode != 0) {
+ throw new IOException(
+ String.format(
+ "Error executing command. %s " +
+ "Process exited with exit code %d.",
+ command, exitCode));
+ }
+ }
+
+ /**
+ * Given a Tar File as input it will untar the file in a the untar directory
+ * passed as the second parameter
+ *
+ * This utility will untar ".tar" files and ".tar.gz","tgz" files.
+ *
+ * @param inputStream The tar file as input.
+ * @param untarDir The untar directory where to untar the tar file.
+ * @param gzipped The input stream is gzipped
+ * TODO Use magic number and PusbackInputStream to identify
+ * @throws IOException an exception occurred
+ * @throws InterruptedException command interrupted
+ * @throws ExecutionException task submit failed
+ */
+ public static void unTar(InputStream inputStream, File untarDir,
+ boolean gzipped)
+ throws IOException, InterruptedException, ExecutionException {
+ if (!untarDir.mkdirs()) {
+ if (!untarDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " + untarDir);
+ }
+ }
+
+ if(Shell.WINDOWS) {
+ // Tar is not native to Windows. Use simple Java based implementation for
+ // tests and simple tar archives
+ unTarUsingJava(inputStream, untarDir, gzipped);
+ } else {
+ // spawn tar utility to untar archive for full fledged unix behavior such
+ // as resolving symlinks in tar archives
+ unTarUsingTar(inputStream, untarDir, gzipped);
+ }
+ }
+
+ /**
* Given a Tar File as input it will untar the file in a the untar directory
* passed as the second parameter
*
@@ -650,23 +848,41 @@ public class FileUtil {
}
}
+ private static void unTarUsingTar(InputStream inputStream, File untarDir,
+ boolean gzipped)
+ throws IOException, InterruptedException, ExecutionException {
+ StringBuilder untarCommand = new StringBuilder();
+ if (gzipped) {
+ untarCommand.append("gzip -dc | (");
+ }
+ untarCommand.append("cd '");
+ untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
+ untarCommand.append("' && ");
+ untarCommand.append("tar -x ");
+
+ if (gzipped) {
+ untarCommand.append(")");
+ }
+ runCommandOnStream(inputStream, untarCommand.toString());
+ }
+
private static void unTarUsingTar(File inFile, File untarDir,
boolean gzipped) throws IOException {
StringBuffer untarCommand = new StringBuffer();
if (gzipped) {
untarCommand.append(" gzip -dc '");
- untarCommand.append(FileUtil.makeShellPath(inFile));
+ untarCommand.append(FileUtil.makeSecureShellPath(inFile));
untarCommand.append("' | (");
}
untarCommand.append("cd '");
- untarCommand.append(FileUtil.makeShellPath(untarDir));
- untarCommand.append("' ; ");
+ untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
+ untarCommand.append("' && ");
untarCommand.append("tar -xf ");
if (gzipped) {
untarCommand.append(" -)");
} else {
- untarCommand.append(FileUtil.makeShellPath(inFile));
+ untarCommand.append(FileUtil.makeSecureShellPath(inFile));
}
String[] shellCmd = { "bash", "-c", untarCommand.toString() };
ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
@@ -701,6 +917,29 @@ public class FileUtil {
}
}
+ private static void unTarUsingJava(InputStream inputStream, File untarDir,
+ boolean gzipped) throws IOException {
+ TarArchiveInputStream tis = null;
+ try {
+ if (gzipped) {
+ inputStream = new BufferedInputStream(new GZIPInputStream(
+ inputStream));
+ } else {
+ inputStream =
+ new BufferedInputStream(inputStream);
+ }
+
+ tis = new TarArchiveInputStream(inputStream);
+
+ for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) {
+ unpackEntries(tis, entry, untarDir);
+ entry = tis.getNextTarEntry();
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, tis, inputStream);
+ }
+ }
+
private static void unpackEntries(TarArchiveInputStream tis,
TarArchiveEntry entry, File outputDir) throws IOException {
if (entry.isDirectory()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
index 19b51ad..89b7d76 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
@@ -34,9 +34,11 @@ import java.util.Enumeration;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
+import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.regex.Pattern;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileUtil;
@@ -98,6 +100,69 @@ public class RunJar {
* Unpack matching files from a jar. Entries inside the jar that do
* not match the given pattern will be skipped.
*
+ * @param inputStream the jar stream to unpack
+ * @param toDir the destination directory into which to unpack the jar
+ * @param unpackRegex the pattern to match jar entries against
+ *
+ * @throws IOException if an I/O error has occurred or toDir
+ * cannot be created and does not already exist
+ */
+ public static void unJar(InputStream inputStream, File toDir,
+ Pattern unpackRegex)
+ throws IOException {
+ try (JarInputStream jar = new JarInputStream(inputStream)) {
+ int numOfFailedLastModifiedSet = 0;
+ for (JarEntry entry = jar.getNextJarEntry();
+ entry != null;
+ entry = jar.getNextJarEntry()) {
+ if (!entry.isDirectory() &&
+ unpackRegex.matcher(entry.getName()).matches()) {
+ File file = new File(toDir, entry.getName());
+ ensureDirectory(file.getParentFile());
+ try (OutputStream out = new FileOutputStream(file)) {
+ IOUtils.copyBytes(jar, out, BUFFER_SIZE);
+ }
+ if (!file.setLastModified(entry.getTime())) {
+ numOfFailedLastModifiedSet++;
+ }
+ }
+ }
+ if (numOfFailedLastModifiedSet > 0) {
+ LOG.warn("Could not set last modfied time for {} file(s)",
+ numOfFailedLastModifiedSet);
+ }
+ }
+ }
+
+ /**
+ * Unpack matching files from a jar. Entries inside the jar that do
+ * not match the given pattern will be skipped. Keep also a copy
+ * of the entire jar in the same directory for backward compatibility.
+ * TODO remove this feature in a new release and do only unJar
+ *
+ * @param inputStream the jar stream to unpack
+ * @param toDir the destination directory into which to unpack the jar
+ * @param unpackRegex the pattern to match jar entries against
+ *
+ * @throws IOException if an I/O error has occurred or toDir
+ * cannot be created and does not already exist
+ */
+ @Deprecated
+ public static void unJarAndSave(InputStream inputStream, File toDir,
+ String name, Pattern unpackRegex)
+ throws IOException{
+ File file = new File(toDir, name);
+ ensureDirectory(toDir);
+ try (OutputStream jar = new FileOutputStream(file);
+ TeeInputStream teeInputStream = new TeeInputStream(inputStream, jar)) {
+ unJar(teeInputStream, toDir, unpackRegex);
+ }
+ }
+
+ /**
+ * Unpack matching files from a jar. Entries inside the jar that do
+ * not match the given pattern will be skipped.
+ *
* @param jarFile the .jar file to unpack
* @param toDir the destination directory into which to unpack the jar
* @param unpackRegex the pattern to match jar entries against
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index 6e59574..1a60948 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
@@ -29,6 +31,7 @@ import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -54,6 +57,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Download a single URL to the local disk.
@@ -247,9 +251,21 @@ public class FSDownload implements Callable<Path> {
}
}
- private Path copy(Path sCopy, Path dstdir) throws IOException {
+ /**
+ * Localize files.
+ * @param destination destination directory
+ * @throws IOException cannot read or write file
+ * @throws YarnException subcommand returned an error
+ */
+ private void verifyAndCopy(Path destination)
+ throws IOException, YarnException {
+ final Path sCopy;
+ try {
+ sCopy = resource.getResource().toPath();
+ } catch (URISyntaxException e) {
+ throw new IOException("Invalid resource", e);
+ }
FileSystem sourceFs = sCopy.getFileSystem(conf);
- Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
FileStatus sStat = sourceFs.getFileStatus(sCopy);
if (sStat.getModificationTime() != resource.getTimestamp()) {
throw new IOException("Resource " + sCopy +
@@ -264,82 +280,108 @@ public class FSDownload implements Callable<Path> {
}
}
- FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
- true, conf);
- return dCopy;
+ downloadAndUnpack(sCopy, destination);
}
- private long unpack(File localrsrc, File dst) throws IOException {
- switch (resource.getType()) {
- case ARCHIVE: {
- String lowerDst = StringUtils.toLowerCase(dst.getName());
- if (lowerDst.endsWith(".jar")) {
- RunJar.unJar(localrsrc, dst);
- } else if (lowerDst.endsWith(".zip")) {
- FileUtil.unZip(localrsrc, dst);
- } else if (lowerDst.endsWith(".tar.gz") ||
- lowerDst.endsWith(".tgz") ||
- lowerDst.endsWith(".tar")) {
- FileUtil.unTar(localrsrc, dst);
+ /**
+ * Copy source path to destination with localization rules.
+ * @param source source path to copy. Typically HDFS
+ * @param destination destination path. Typically local filesystem
+ * @exception YarnException Any error has occurred
+ */
+ private void downloadAndUnpack(Path source, Path destination)
+ throws YarnException {
+ try {
+ FileSystem sourceFileSystem = source.getFileSystem(conf);
+ FileSystem destinationFileSystem = destination.getFileSystem(conf);
+ if (sourceFileSystem.getFileStatus(source).isDirectory()) {
+ FileUtil.copy(
+ sourceFileSystem, source,
+ destinationFileSystem, destination, false,
+ true, conf);
} else {
- LOG.warn("Cannot unpack " + localrsrc);
- if (!localrsrc.renameTo(dst)) {
- throw new IOException("Unable to rename file: [" + localrsrc
- + "] to [" + dst + "]");
- }
+ unpack(source, destination, sourceFileSystem, destinationFileSystem);
}
+ } catch (Exception e) {
+ throw new YarnException("Download and unpack failed", e);
}
- break;
- case PATTERN: {
+ }
+
+ /**
+ * Do the localization action on the input stream.
+ * We use the deprecated method RunJar.unJarAndSave for compatibility reasons.
+ * We should use the more efficient RunJar.unJar in the future.
+ * @param source Source path
+ * @param destination Destination pth
+ * @param sourceFileSystem Source filesystem
+ * @param destinationFileSystem Destination filesystem
+ * @throws IOException Could not read or write stream
+ * @throws InterruptedException Operation interrupted by caller
+ * @throws ExecutionException Could not create thread pool execution
+ */
+ @SuppressWarnings("deprecation")
+ private void unpack(Path source, Path destination,
+ FileSystem sourceFileSystem,
+ FileSystem destinationFileSystem)
+ throws IOException, InterruptedException, ExecutionException {
+ try (InputStream inputStream = sourceFileSystem.open(source)) {
+ File dst = new File(destination.toUri());
String lowerDst = StringUtils.toLowerCase(dst.getName());
- if (lowerDst.endsWith(".jar")) {
- String p = resource.getPattern();
- RunJar.unJar(localrsrc, dst,
- p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
- File newDst = new File(dst, dst.getName());
- if (!dst.exists() && !dst.mkdir()) {
- throw new IOException("Unable to create directory: [" + dst + "]");
+ switch (resource.getType()) {
+ case ARCHIVE:
+ if (lowerDst.endsWith(".jar")) {
+ RunJar.unJar(inputStream, dst, RunJar.MATCH_ANY);
+ } else if (lowerDst.endsWith(".zip")) {
+ FileUtil.unZip(inputStream, dst);
+ } else if (lowerDst.endsWith(".tar.gz") ||
+ lowerDst.endsWith(".tgz") ||
+ lowerDst.endsWith(".tar")) {
+ FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
+ } else {
+ LOG.warn("Cannot unpack " + source);
+ try (OutputStream outputStream =
+ destinationFileSystem.create(destination, true)) {
+ IOUtils.copy(inputStream, outputStream);
+ }
}
- if (!localrsrc.renameTo(newDst)) {
- throw new IOException("Unable to rename file: [" + localrsrc
- + "] to [" + newDst + "]");
+ break;
+ case PATTERN:
+ if (lowerDst.endsWith(".jar")) {
+ String p = resource.getPattern();
+ if (!dst.exists() && !dst.mkdir()) {
+ throw new IOException("Unable to create directory: [" + dst + "]");
+ }
+ RunJar.unJarAndSave(inputStream, dst, source.getName(),
+ p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
+ } else if (lowerDst.endsWith(".zip")) {
+ LOG.warn("Treating [" + source + "] as an archive even though it " +
+ "was specified as PATTERN");
+ FileUtil.unZip(inputStream, dst);
+ } else if (lowerDst.endsWith(".tar.gz") ||
+ lowerDst.endsWith(".tgz") ||
+ lowerDst.endsWith(".tar")) {
+ LOG.warn("Treating [" + source + "] as an archive even though it " +
+ "was specified as PATTERN");
+ FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
+ } else {
+ LOG.warn("Cannot unpack " + source);
+ try (OutputStream outputStream =
+ destinationFileSystem.create(destination, true)) {
+ IOUtils.copy(inputStream, outputStream);
+ }
}
- } else if (lowerDst.endsWith(".zip")) {
- LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
- "was specified as PATTERN");
- FileUtil.unZip(localrsrc, dst);
- } else if (lowerDst.endsWith(".tar.gz") ||
- lowerDst.endsWith(".tgz") ||
- lowerDst.endsWith(".tar")) {
- LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
- "was specified as PATTERN");
- FileUtil.unTar(localrsrc, dst);
- } else {
- LOG.warn("Cannot unpack " + localrsrc);
- if (!localrsrc.renameTo(dst)) {
- throw new IOException("Unable to rename file: [" + localrsrc
- + "] to [" + dst + "]");
+ break;
+ case FILE:
+ default:
+ try (OutputStream outputStream =
+ destinationFileSystem.create(destination, true)) {
+ IOUtils.copy(inputStream, outputStream);
}
+ break;
}
+ // TODO Should calculate here before returning
+ //return FileUtil.getDU(destDir);
}
- break;
- case FILE:
- default:
- if (!localrsrc.renameTo(dst)) {
- throw new IOException("Unable to rename file: [" + localrsrc
- + "] to [" + dst + "]");
- }
- break;
- }
- if(localrsrc.isFile()){
- try {
- files.delete(new Path(localrsrc.toString()), false);
- } catch (IOException ignore) {
- }
- }
- return 0;
- // TODO Should calculate here before returning
- //return FileUtil.getDU(destDir);
}
@Override
@@ -352,27 +394,34 @@ public class FSDownload implements Callable<Path> {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Starting to download " + sCopy);
+ LOG.debug(String.format("Starting to download %s %s %s",
+ sCopy,
+ resource.getType(),
+ resource.getPattern()));
}
- createDir(destDirPath, cachePerms);
- final Path dst_work = new Path(destDirPath + "_tmp");
- createDir(dst_work, cachePerms);
- Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
+ final Path destinationTmp = new Path(destDirPath + "_tmp");
+ createDir(destinationTmp, PRIVATE_DIR_PERMS);
+ Path dFinal =
+ files.makeQualified(new Path(destinationTmp, sCopy.getName()));
try {
- Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
- : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
- public Path run() throws Exception {
- return files.makeQualified(copy(sCopy, dst_work));
- };
- });
- unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
+ if (userUgi == null) {
+ verifyAndCopy(dFinal);
+ } else {
+ userUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ verifyAndCopy(dFinal);
+ return null;
+ }
+ });
+ }
changePermissions(dFinal.getFileSystem(conf), dFinal);
- files.rename(dst_work, destDirPath, Rename.OVERWRITE);
+ files.rename(destinationTmp, destDirPath, Rename.OVERWRITE);
if (LOG.isDebugEnabled()) {
- LOG.debug("File has been downloaded to " +
- new Path(destDirPath, sCopy.getName()));
+ LOG.debug(String.format("File has been downloaded to %s from %s",
+ new Path(destDirPath, sCopy.getName()), sCopy));
}
} catch (Exception e) {
try {
@@ -382,7 +431,7 @@ public class FSDownload implements Callable<Path> {
throw e;
} finally {
try {
- files.delete(dst_work, true);
+ files.delete(destinationTmp, true);
} catch (FileNotFoundException ignore) {
}
conf = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
index 877dd08..fa8c039 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
@@ -82,6 +82,9 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+/**
+ * Unit test for the FSDownload class.
+ */
public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@@ -90,7 +93,8 @@ public class TestFSDownload {
private enum TEST_FILE_TYPE {
TAR, JAR, ZIP, TGZ
};
-
+ private Configuration conf = new Configuration();
+
@AfterClass
public static void deleteTestDir() throws IOException {
FileContext fs = FileContext.getLocalFSFileContext();
@@ -132,6 +136,18 @@ public class TestFSDownload {
FileOutputStream stream = new FileOutputStream(jarFile);
LOG.info("Create jar out stream ");
JarOutputStream out = new JarOutputStream(stream, new Manifest());
+ ZipEntry entry = new ZipEntry("classes/1.class");
+ out.putNextEntry(entry);
+ out.write(1);
+ out.write(2);
+ out.write(3);
+ out.closeEntry();
+ ZipEntry entry2 = new ZipEntry("classes/2.class");
+ out.putNextEntry(entry2);
+ out.write(1);
+ out.write(2);
+ out.write(3);
+ out.closeEntry();
LOG.info("Done writing jar stream ");
out.close();
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
@@ -256,7 +272,6 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testDownloadBadPublic() throws IOException, URISyntaxException,
InterruptedException {
- Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
@@ -307,7 +322,6 @@ public class TestFSDownload {
@Test (timeout=60000)
public void testDownloadPublicWithStatCache() throws IOException,
URISyntaxException, InterruptedException, ExecutionException {
- final Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
@@ -382,7 +396,6 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testDownload() throws IOException, URISyntaxException,
InterruptedException {
- Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
@@ -438,7 +451,7 @@ public class TestFSDownload {
FileStatus status = files.getFileStatus(localized.getParent());
FsPermission perm = status.getPermission();
assertEquals("Cache directory permissions are incorrect",
- new FsPermission((short)0755), perm);
+ new FsPermission((short)0700), perm);
status = files.getFileStatus(localized);
perm = status.getPermission();
@@ -455,7 +468,6 @@ public class TestFSDownload {
private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException,
URISyntaxException, InterruptedException{
- Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
@@ -530,7 +542,7 @@ public class TestFSDownload {
}
}
- @Test (timeout=10000)
+ @Test (timeout=10000)
public void testDownloadArchive() throws IOException, URISyntaxException,
InterruptedException {
downloadWithFileType(TEST_FILE_TYPE.TAR);
@@ -542,7 +554,7 @@ public class TestFSDownload {
downloadWithFileType(TEST_FILE_TYPE.JAR);
}
- @Test (timeout=10000)
+ @Test (timeout=10000)
public void testDownloadArchiveZip() throws IOException, URISyntaxException,
InterruptedException {
downloadWithFileType(TEST_FILE_TYPE.ZIP);
@@ -603,7 +615,6 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testDirDownload() throws IOException, InterruptedException {
- Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
@@ -668,7 +679,6 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testUniqueDestinationPath() throws Exception {
- Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org