You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/30 18:10:01 UTC
[19/50] [abbrv] hadoop git commit: Revert "YARN-2185. Use pipes when
localizing archives. Contributed by Miklos Szegedi"
Revert "YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi"
This reverts commit 1b0f265db1a5bfccf1d870912237ea9618bd9c34.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/901d15a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/901d15a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/901d15a3
Branch: refs/heads/YARN-6592
Commit: 901d15a30b9fc6c7015f4e2e2c06e6ee42a39662
Parents: 6463e10
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Jan 30 08:34:39 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Jan 30 08:34:39 2018 -0600
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/FileUtil.java | 251 +------------------
.../java/org/apache/hadoop/util/RunJar.java | 65 -----
.../org/apache/hadoop/yarn/util/FSDownload.java | 215 ++++++----------
.../apache/hadoop/yarn/util/TestFSDownload.java | 30 +--
4 files changed, 99 insertions(+), 462 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index bf9b146..4d971aa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -20,35 +20,27 @@ package org.apache.hadoop.fs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
-import java.nio.charset.Charset;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
-import java.util.zip.ZipInputStream;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -83,11 +75,6 @@ public class FileUtil {
public static final int SYMLINK_NO_PRIVILEGE = 2;
/**
- * Buffer size for copy the content of compressed file to new file.
- */
- private static final int BUFFER_SIZE = 8_192;
-
- /**
* convert an array of FileStatus to an array of Path
*
* @param stats
@@ -539,22 +526,6 @@ public class FileUtil {
}
/**
- * Convert a os-native filename to a path that works for the shell
- * and avoids script injection attacks.
- * @param file The filename to convert
- * @return The unix pathname
- * @throws IOException on windows, there can be problems with the subprocess
- */
- public static String makeSecureShellPath(File file) throws IOException {
- if (Shell.WINDOWS) {
- // Currently it is never called, but it might be helpful in the future.
- throw new UnsupportedOperationException("Not implemented for Windows");
- } else {
- return makeShellPath(file, false).replace("'", "'\\''");
- }
- }
-
- /**
* Convert a os-native filename to a path that works for the shell.
* @param file The filename to convert
* @param makeCanonicalPath
@@ -605,48 +576,11 @@ public class FileUtil {
}
/**
- * Given a stream input it will unzip the it in the unzip directory.
- * passed as the second parameter
- * @param inputStream The zip file as input
- * @param toDir The unzip directory where to unzip the zip file.
- * @throws IOException an exception occurred
- */
- public static void unZip(InputStream inputStream, File toDir)
- throws IOException {
- try (ZipInputStream zip = new ZipInputStream(inputStream)) {
- int numOfFailedLastModifiedSet = 0;
- for(ZipEntry entry = zip.getNextEntry();
- entry != null;
- entry = zip.getNextEntry()) {
- if (!entry.isDirectory()) {
- File file = new File(toDir, entry.getName());
- File parent = file.getParentFile();
- if (!parent.mkdirs() &&
- !parent.isDirectory()) {
- throw new IOException("Mkdirs failed to create " +
- parent.getAbsolutePath());
- }
- try (OutputStream out = new FileOutputStream(file)) {
- IOUtils.copyBytes(zip, out, BUFFER_SIZE);
- }
- if (!file.setLastModified(entry.getTime())) {
- numOfFailedLastModifiedSet++;
- }
- }
- }
- if (numOfFailedLastModifiedSet > 0) {
- LOG.warn("Could not set last modfied time for {} file(s)",
- numOfFailedLastModifiedSet);
- }
- }
- }
-
- /**
- * Given a File input it will unzip it in the unzip directory.
+ * Given a File input it will unzip the file in a the unzip directory
* passed as the second parameter
* @param inFile The zip file as input
* @param unzipDir The unzip directory where to unzip the zip file.
- * @throws IOException An I/O exception has occurred
+ * @throws IOException
*/
public static void unZip(File inFile, File unzipDir) throws IOException {
Enumeration<? extends ZipEntry> entries;
@@ -687,138 +621,6 @@ public class FileUtil {
}
/**
- * Run a command and send the contents of an input stream to it.
- * @param inputStream Input stream to forward to the shell command
- * @param command shell command to run
- * @throws IOException read or write failed
- * @throws InterruptedException command interrupted
- * @throws ExecutionException task submit failed
- */
- private static void runCommandOnStream(
- InputStream inputStream, String command)
- throws IOException, InterruptedException, ExecutionException {
- ExecutorService executor = null;
- ProcessBuilder builder = new ProcessBuilder();
- builder.command(
- Shell.WINDOWS ? "cmd" : "bash",
- Shell.WINDOWS ? "/c" : "-c",
- command);
- Process process = builder.start();
- int exitCode;
- try {
- // Consume stdout and stderr, to avoid blocking the command
- executor = Executors.newFixedThreadPool(2);
- Future output = executor.submit(() -> {
- try {
- // Read until the output stream receives an EOF and closed.
- if (LOG.isDebugEnabled()) {
- // Log directly to avoid out of memory errors
- try (BufferedReader reader =
- new BufferedReader(
- new InputStreamReader(process.getInputStream(),
- Charset.forName("UTF-8")))) {
- String line;
- while((line = reader.readLine()) != null) {
- LOG.debug(line);
- }
- }
- } else {
- org.apache.commons.io.IOUtils.copy(
- process.getInputStream(),
- new IOUtils.NullOutputStream());
- }
- } catch (IOException e) {
- LOG.debug(e.getMessage());
- }
- });
- Future error = executor.submit(() -> {
- try {
- // Read until the error stream receives an EOF and closed.
- if (LOG.isDebugEnabled()) {
- // Log directly to avoid out of memory errors
- try (BufferedReader reader =
- new BufferedReader(
- new InputStreamReader(process.getErrorStream(),
- Charset.forName("UTF-8")))) {
- String line;
- while((line = reader.readLine()) != null) {
- LOG.debug(line);
- }
- }
- } else {
- org.apache.commons.io.IOUtils.copy(
- process.getErrorStream(),
- new IOUtils.NullOutputStream());
- }
- } catch (IOException e) {
- LOG.debug(e.getMessage());
- }
- });
-
- // Pass the input stream to the command to process
- try {
- org.apache.commons.io.IOUtils.copy(
- inputStream, process.getOutputStream());
- } finally {
- process.getOutputStream().close();
- }
-
- // Wait for both stdout and stderr futures to finish
- error.get();
- output.get();
- } finally {
- // Clean up the threads
- if (executor != null) {
- executor.shutdown();
- }
- // Wait to avoid leaking the child process
- exitCode = process.waitFor();
- }
-
- if (exitCode != 0) {
- throw new IOException(
- String.format(
- "Error executing command. %s " +
- "Process exited with exit code %d.",
- command, exitCode));
- }
- }
-
- /**
- * Given a Tar File as input it will untar the file in a the untar directory
- * passed as the second parameter
- *
- * This utility will untar ".tar" files and ".tar.gz","tgz" files.
- *
- * @param inputStream The tar file as input.
- * @param untarDir The untar directory where to untar the tar file.
- * @param gzipped The input stream is gzipped
- * TODO Use magic number and PusbackInputStream to identify
- * @throws IOException an exception occurred
- * @throws InterruptedException command interrupted
- * @throws ExecutionException task submit failed
- */
- public static void unTar(InputStream inputStream, File untarDir,
- boolean gzipped)
- throws IOException, InterruptedException, ExecutionException {
- if (!untarDir.mkdirs()) {
- if (!untarDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + untarDir);
- }
- }
-
- if(Shell.WINDOWS) {
- // Tar is not native to Windows. Use simple Java based implementation for
- // tests and simple tar archives
- unTarUsingJava(inputStream, untarDir, gzipped);
- } else {
- // spawn tar utility to untar archive for full fledged unix behavior such
- // as resolving symlinks in tar archives
- unTarUsingTar(inputStream, untarDir, gzipped);
- }
- }
-
- /**
* Given a Tar File as input it will untar the file in a the untar directory
* passed as the second parameter
*
@@ -848,41 +650,23 @@ public class FileUtil {
}
}
- private static void unTarUsingTar(InputStream inputStream, File untarDir,
- boolean gzipped)
- throws IOException, InterruptedException, ExecutionException {
- StringBuilder untarCommand = new StringBuilder();
- if (gzipped) {
- untarCommand.append("gzip -dc | (");
- }
- untarCommand.append("cd '");
- untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
- untarCommand.append("' && ");
- untarCommand.append("tar -x ");
-
- if (gzipped) {
- untarCommand.append(")");
- }
- runCommandOnStream(inputStream, untarCommand.toString());
- }
-
private static void unTarUsingTar(File inFile, File untarDir,
boolean gzipped) throws IOException {
StringBuffer untarCommand = new StringBuffer();
if (gzipped) {
untarCommand.append(" gzip -dc '");
- untarCommand.append(FileUtil.makeSecureShellPath(inFile));
+ untarCommand.append(FileUtil.makeShellPath(inFile));
untarCommand.append("' | (");
}
untarCommand.append("cd '");
- untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
- untarCommand.append("' && ");
+ untarCommand.append(FileUtil.makeShellPath(untarDir));
+ untarCommand.append("' ; ");
untarCommand.append("tar -xf ");
if (gzipped) {
untarCommand.append(" -)");
} else {
- untarCommand.append(FileUtil.makeSecureShellPath(inFile));
+ untarCommand.append(FileUtil.makeShellPath(inFile));
}
String[] shellCmd = { "bash", "-c", untarCommand.toString() };
ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
@@ -917,29 +701,6 @@ public class FileUtil {
}
}
- private static void unTarUsingJava(InputStream inputStream, File untarDir,
- boolean gzipped) throws IOException {
- TarArchiveInputStream tis = null;
- try {
- if (gzipped) {
- inputStream = new BufferedInputStream(new GZIPInputStream(
- inputStream));
- } else {
- inputStream =
- new BufferedInputStream(inputStream);
- }
-
- tis = new TarArchiveInputStream(inputStream);
-
- for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) {
- unpackEntries(tis, entry, untarDir);
- entry = tis.getNextTarEntry();
- }
- } finally {
- IOUtils.cleanupWithLogger(LOG, tis, inputStream);
- }
- }
-
private static void unpackEntries(TarArchiveInputStream tis,
TarArchiveEntry entry, File outputDir) throws IOException {
if (entry.isDirectory()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
index 89b7d76..19b51ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
@@ -34,11 +34,9 @@ import java.util.Enumeration;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
-import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import java.util.regex.Pattern;
-import org.apache.commons.io.input.TeeInputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileUtil;
@@ -100,69 +98,6 @@ public class RunJar {
* Unpack matching files from a jar. Entries inside the jar that do
* not match the given pattern will be skipped.
*
- * @param inputStream the jar stream to unpack
- * @param toDir the destination directory into which to unpack the jar
- * @param unpackRegex the pattern to match jar entries against
- *
- * @throws IOException if an I/O error has occurred or toDir
- * cannot be created and does not already exist
- */
- public static void unJar(InputStream inputStream, File toDir,
- Pattern unpackRegex)
- throws IOException {
- try (JarInputStream jar = new JarInputStream(inputStream)) {
- int numOfFailedLastModifiedSet = 0;
- for (JarEntry entry = jar.getNextJarEntry();
- entry != null;
- entry = jar.getNextJarEntry()) {
- if (!entry.isDirectory() &&
- unpackRegex.matcher(entry.getName()).matches()) {
- File file = new File(toDir, entry.getName());
- ensureDirectory(file.getParentFile());
- try (OutputStream out = new FileOutputStream(file)) {
- IOUtils.copyBytes(jar, out, BUFFER_SIZE);
- }
- if (!file.setLastModified(entry.getTime())) {
- numOfFailedLastModifiedSet++;
- }
- }
- }
- if (numOfFailedLastModifiedSet > 0) {
- LOG.warn("Could not set last modfied time for {} file(s)",
- numOfFailedLastModifiedSet);
- }
- }
- }
-
- /**
- * Unpack matching files from a jar. Entries inside the jar that do
- * not match the given pattern will be skipped. Keep also a copy
- * of the entire jar in the same directory for backward compatibility.
- * TODO remove this feature in a new release and do only unJar
- *
- * @param inputStream the jar stream to unpack
- * @param toDir the destination directory into which to unpack the jar
- * @param unpackRegex the pattern to match jar entries against
- *
- * @throws IOException if an I/O error has occurred or toDir
- * cannot be created and does not already exist
- */
- @Deprecated
- public static void unJarAndSave(InputStream inputStream, File toDir,
- String name, Pattern unpackRegex)
- throws IOException{
- File file = new File(toDir, name);
- ensureDirectory(toDir);
- try (OutputStream jar = new FileOutputStream(file);
- TeeInputStream teeInputStream = new TeeInputStream(inputStream, jar)) {
- unJar(teeInputStream, toDir, unpackRegex);
- }
- }
-
- /**
- * Unpack matching files from a jar. Entries inside the jar that do
- * not match the given pattern will be skipped.
- *
* @param jarFile the .jar file to unpack
* @param toDir the destination directory into which to unpack the jar
* @param unpackRegex the pattern to match jar entries against
http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index 1a60948..6e59574 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
@@ -31,7 +29,6 @@ import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -57,7 +54,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures;
-import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Download a single URL to the local disk.
@@ -251,21 +247,9 @@ public class FSDownload implements Callable<Path> {
}
}
- /**
- * Localize files.
- * @param destination destination directory
- * @throws IOException cannot read or write file
- * @throws YarnException subcommand returned an error
- */
- private void verifyAndCopy(Path destination)
- throws IOException, YarnException {
- final Path sCopy;
- try {
- sCopy = resource.getResource().toPath();
- } catch (URISyntaxException e) {
- throw new IOException("Invalid resource", e);
- }
+ private Path copy(Path sCopy, Path dstdir) throws IOException {
FileSystem sourceFs = sCopy.getFileSystem(conf);
+ Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
FileStatus sStat = sourceFs.getFileStatus(sCopy);
if (sStat.getModificationTime() != resource.getTimestamp()) {
throw new IOException("Resource " + sCopy +
@@ -280,108 +264,82 @@ public class FSDownload implements Callable<Path> {
}
}
- downloadAndUnpack(sCopy, destination);
+ FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
+ true, conf);
+ return dCopy;
}
- /**
- * Copy source path to destination with localization rules.
- * @param source source path to copy. Typically HDFS
- * @param destination destination path. Typically local filesystem
- * @exception YarnException Any error has occurred
- */
- private void downloadAndUnpack(Path source, Path destination)
- throws YarnException {
- try {
- FileSystem sourceFileSystem = source.getFileSystem(conf);
- FileSystem destinationFileSystem = destination.getFileSystem(conf);
- if (sourceFileSystem.getFileStatus(source).isDirectory()) {
- FileUtil.copy(
- sourceFileSystem, source,
- destinationFileSystem, destination, false,
- true, conf);
+ private long unpack(File localrsrc, File dst) throws IOException {
+ switch (resource.getType()) {
+ case ARCHIVE: {
+ String lowerDst = StringUtils.toLowerCase(dst.getName());
+ if (lowerDst.endsWith(".jar")) {
+ RunJar.unJar(localrsrc, dst);
+ } else if (lowerDst.endsWith(".zip")) {
+ FileUtil.unZip(localrsrc, dst);
+ } else if (lowerDst.endsWith(".tar.gz") ||
+ lowerDst.endsWith(".tgz") ||
+ lowerDst.endsWith(".tar")) {
+ FileUtil.unTar(localrsrc, dst);
} else {
- unpack(source, destination, sourceFileSystem, destinationFileSystem);
+ LOG.warn("Cannot unpack " + localrsrc);
+ if (!localrsrc.renameTo(dst)) {
+ throw new IOException("Unable to rename file: [" + localrsrc
+ + "] to [" + dst + "]");
+ }
}
- } catch (Exception e) {
- throw new YarnException("Download and unpack failed", e);
}
- }
-
- /**
- * Do the localization action on the input stream.
- * We use the deprecated method RunJar.unJarAndSave for compatibility reasons.
- * We should use the more efficient RunJar.unJar in the future.
- * @param source Source path
- * @param destination Destination pth
- * @param sourceFileSystem Source filesystem
- * @param destinationFileSystem Destination filesystem
- * @throws IOException Could not read or write stream
- * @throws InterruptedException Operation interrupted by caller
- * @throws ExecutionException Could not create thread pool execution
- */
- @SuppressWarnings("deprecation")
- private void unpack(Path source, Path destination,
- FileSystem sourceFileSystem,
- FileSystem destinationFileSystem)
- throws IOException, InterruptedException, ExecutionException {
- try (InputStream inputStream = sourceFileSystem.open(source)) {
- File dst = new File(destination.toUri());
+ break;
+ case PATTERN: {
String lowerDst = StringUtils.toLowerCase(dst.getName());
- switch (resource.getType()) {
- case ARCHIVE:
- if (lowerDst.endsWith(".jar")) {
- RunJar.unJar(inputStream, dst, RunJar.MATCH_ANY);
- } else if (lowerDst.endsWith(".zip")) {
- FileUtil.unZip(inputStream, dst);
- } else if (lowerDst.endsWith(".tar.gz") ||
- lowerDst.endsWith(".tgz") ||
- lowerDst.endsWith(".tar")) {
- FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
- } else {
- LOG.warn("Cannot unpack " + source);
- try (OutputStream outputStream =
- destinationFileSystem.create(destination, true)) {
- IOUtils.copy(inputStream, outputStream);
- }
+ if (lowerDst.endsWith(".jar")) {
+ String p = resource.getPattern();
+ RunJar.unJar(localrsrc, dst,
+ p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
+ File newDst = new File(dst, dst.getName());
+ if (!dst.exists() && !dst.mkdir()) {
+ throw new IOException("Unable to create directory: [" + dst + "]");
}
- break;
- case PATTERN:
- if (lowerDst.endsWith(".jar")) {
- String p = resource.getPattern();
- if (!dst.exists() && !dst.mkdir()) {
- throw new IOException("Unable to create directory: [" + dst + "]");
- }
- RunJar.unJarAndSave(inputStream, dst, source.getName(),
- p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
- } else if (lowerDst.endsWith(".zip")) {
- LOG.warn("Treating [" + source + "] as an archive even though it " +
- "was specified as PATTERN");
- FileUtil.unZip(inputStream, dst);
- } else if (lowerDst.endsWith(".tar.gz") ||
- lowerDst.endsWith(".tgz") ||
- lowerDst.endsWith(".tar")) {
- LOG.warn("Treating [" + source + "] as an archive even though it " +
- "was specified as PATTERN");
- FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
- } else {
- LOG.warn("Cannot unpack " + source);
- try (OutputStream outputStream =
- destinationFileSystem.create(destination, true)) {
- IOUtils.copy(inputStream, outputStream);
- }
+ if (!localrsrc.renameTo(newDst)) {
+ throw new IOException("Unable to rename file: [" + localrsrc
+ + "] to [" + newDst + "]");
}
- break;
- case FILE:
- default:
- try (OutputStream outputStream =
- destinationFileSystem.create(destination, true)) {
- IOUtils.copy(inputStream, outputStream);
+ } else if (lowerDst.endsWith(".zip")) {
+ LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
+ "was specified as PATTERN");
+ FileUtil.unZip(localrsrc, dst);
+ } else if (lowerDst.endsWith(".tar.gz") ||
+ lowerDst.endsWith(".tgz") ||
+ lowerDst.endsWith(".tar")) {
+ LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
+ "was specified as PATTERN");
+ FileUtil.unTar(localrsrc, dst);
+ } else {
+ LOG.warn("Cannot unpack " + localrsrc);
+ if (!localrsrc.renameTo(dst)) {
+ throw new IOException("Unable to rename file: [" + localrsrc
+ + "] to [" + dst + "]");
}
- break;
}
- // TODO Should calculate here before returning
- //return FileUtil.getDU(destDir);
}
+ break;
+ case FILE:
+ default:
+ if (!localrsrc.renameTo(dst)) {
+ throw new IOException("Unable to rename file: [" + localrsrc
+ + "] to [" + dst + "]");
+ }
+ break;
+ }
+ if(localrsrc.isFile()){
+ try {
+ files.delete(new Path(localrsrc.toString()), false);
+ } catch (IOException ignore) {
+ }
+ }
+ return 0;
+ // TODO Should calculate here before returning
+ //return FileUtil.getDU(destDir);
}
@Override
@@ -394,34 +352,27 @@ public class FSDownload implements Callable<Path> {
}
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Starting to download %s %s %s",
- sCopy,
- resource.getType(),
- resource.getPattern()));
+ LOG.debug("Starting to download " + sCopy);
}
- final Path destinationTmp = new Path(destDirPath + "_tmp");
- createDir(destinationTmp, PRIVATE_DIR_PERMS);
- Path dFinal =
- files.makeQualified(new Path(destinationTmp, sCopy.getName()));
+ createDir(destDirPath, cachePerms);
+ final Path dst_work = new Path(destDirPath + "_tmp");
+ createDir(dst_work, cachePerms);
+ Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
try {
- if (userUgi == null) {
- verifyAndCopy(dFinal);
- } else {
- userUgi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- verifyAndCopy(dFinal);
- return null;
- }
- });
- }
+ Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
+ : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
+ public Path run() throws Exception {
+ return files.makeQualified(copy(sCopy, dst_work));
+ };
+ });
+ unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
changePermissions(dFinal.getFileSystem(conf), dFinal);
- files.rename(destinationTmp, destDirPath, Rename.OVERWRITE);
+ files.rename(dst_work, destDirPath, Rename.OVERWRITE);
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("File has been downloaded to %s from %s",
- new Path(destDirPath, sCopy.getName()), sCopy));
+ LOG.debug("File has been downloaded to " +
+ new Path(destDirPath, sCopy.getName()));
}
} catch (Exception e) {
try {
@@ -431,7 +382,7 @@ public class FSDownload implements Callable<Path> {
throw e;
} finally {
try {
- files.delete(destinationTmp, true);
+ files.delete(dst_work, true);
} catch (FileNotFoundException ignore) {
}
conf = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
index fa8c039..877dd08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
@@ -82,9 +82,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-/**
- * Unit test for the FSDownload class.
- */
public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@@ -93,8 +90,7 @@ public class TestFSDownload {
private enum TEST_FILE_TYPE {
TAR, JAR, ZIP, TGZ
};
- private Configuration conf = new Configuration();
-
+
@AfterClass
public static void deleteTestDir() throws IOException {
FileContext fs = FileContext.getLocalFSFileContext();
@@ -136,18 +132,6 @@ public class TestFSDownload {
FileOutputStream stream = new FileOutputStream(jarFile);
LOG.info("Create jar out stream ");
JarOutputStream out = new JarOutputStream(stream, new Manifest());
- ZipEntry entry = new ZipEntry("classes/1.class");
- out.putNextEntry(entry);
- out.write(1);
- out.write(2);
- out.write(3);
- out.closeEntry();
- ZipEntry entry2 = new ZipEntry("classes/2.class");
- out.putNextEntry(entry2);
- out.write(1);
- out.write(2);
- out.write(3);
- out.closeEntry();
LOG.info("Done writing jar stream ");
out.close();
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
@@ -272,6 +256,7 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testDownloadBadPublic() throws IOException, URISyntaxException,
InterruptedException {
+ Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
@@ -322,6 +307,7 @@ public class TestFSDownload {
@Test (timeout=60000)
public void testDownloadPublicWithStatCache() throws IOException,
URISyntaxException, InterruptedException, ExecutionException {
+ final Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
@@ -396,6 +382,7 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testDownload() throws IOException, URISyntaxException,
InterruptedException {
+ Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
@@ -451,7 +438,7 @@ public class TestFSDownload {
FileStatus status = files.getFileStatus(localized.getParent());
FsPermission perm = status.getPermission();
assertEquals("Cache directory permissions are incorrect",
- new FsPermission((short)0700), perm);
+ new FsPermission((short)0755), perm);
status = files.getFileStatus(localized);
perm = status.getPermission();
@@ -468,6 +455,7 @@ public class TestFSDownload {
private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException,
URISyntaxException, InterruptedException{
+ Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
@@ -542,7 +530,7 @@ public class TestFSDownload {
}
}
- @Test (timeout=10000)
+ @Test (timeout=10000)
public void testDownloadArchive() throws IOException, URISyntaxException,
InterruptedException {
downloadWithFileType(TEST_FILE_TYPE.TAR);
@@ -554,7 +542,7 @@ public class TestFSDownload {
downloadWithFileType(TEST_FILE_TYPE.JAR);
}
- @Test (timeout=10000)
+ @Test (timeout=10000)
public void testDownloadArchiveZip() throws IOException, URISyntaxException,
InterruptedException {
downloadWithFileType(TEST_FILE_TYPE.ZIP);
@@ -615,6 +603,7 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testDirDownload() throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
@@ -679,6 +668,7 @@ public class TestFSDownload {
@Test (timeout=10000)
public void testUniqueDestinationPath() throws Exception {
+ Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org