You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/30 18:10:01 UTC

[19/50] [abbrv] hadoop git commit: Revert "YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi"

Revert "YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi"

This reverts commit 1b0f265db1a5bfccf1d870912237ea9618bd9c34.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/901d15a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/901d15a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/901d15a3

Branch: refs/heads/YARN-6592
Commit: 901d15a30b9fc6c7015f4e2e2c06e6ee42a39662
Parents: 6463e10
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Jan 30 08:34:39 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Jan 30 08:34:39 2018 -0600

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileUtil.java     | 251 +------------------
 .../java/org/apache/hadoop/util/RunJar.java     |  65 -----
 .../org/apache/hadoop/yarn/util/FSDownload.java | 215 ++++++----------
 .../apache/hadoop/yarn/util/TestFSDownload.java |  30 +--
 4 files changed, 99 insertions(+), 462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index bf9b146..4d971aa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -20,35 +20,27 @@ package org.apache.hadoop.fs;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
-import java.nio.charset.Charset;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.jar.Attributes;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
-import java.util.zip.ZipInputStream;
 
 import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -83,11 +75,6 @@ public class FileUtil {
   public static final int SYMLINK_NO_PRIVILEGE = 2;
 
   /**
-   * Buffer size for copy the content of compressed file to new file.
-   */
-  private static final int BUFFER_SIZE = 8_192;
-
-  /**
    * convert an array of FileStatus to an array of Path
    *
    * @param stats
@@ -539,22 +526,6 @@ public class FileUtil {
   }
 
   /**
-   * Convert a os-native filename to a path that works for the shell
-   * and avoids script injection attacks.
-   * @param file The filename to convert
-   * @return The unix pathname
-   * @throws IOException on windows, there can be problems with the subprocess
-   */
-  public static String makeSecureShellPath(File file) throws IOException {
-    if (Shell.WINDOWS) {
-      // Currently it is never called, but it might be helpful in the future.
-      throw new UnsupportedOperationException("Not implemented for Windows");
-    } else {
-      return makeShellPath(file, false).replace("'", "'\\''");
-    }
-  }
-
-  /**
    * Convert a os-native filename to a path that works for the shell.
    * @param file The filename to convert
    * @param makeCanonicalPath
@@ -605,48 +576,11 @@ public class FileUtil {
   }
 
   /**
-   * Given a stream input it will unzip the it in the unzip directory.
-   * passed as the second parameter
-   * @param inputStream The zip file as input
-   * @param toDir The unzip directory where to unzip the zip file.
-   * @throws IOException an exception occurred
-   */
-  public static void unZip(InputStream inputStream, File toDir)
-      throws IOException {
-    try (ZipInputStream zip = new ZipInputStream(inputStream)) {
-      int numOfFailedLastModifiedSet = 0;
-      for(ZipEntry entry = zip.getNextEntry();
-          entry != null;
-          entry = zip.getNextEntry()) {
-        if (!entry.isDirectory()) {
-          File file = new File(toDir, entry.getName());
-          File parent = file.getParentFile();
-          if (!parent.mkdirs() &&
-              !parent.isDirectory()) {
-            throw new IOException("Mkdirs failed to create " +
-                parent.getAbsolutePath());
-          }
-          try (OutputStream out = new FileOutputStream(file)) {
-            IOUtils.copyBytes(zip, out, BUFFER_SIZE);
-          }
-          if (!file.setLastModified(entry.getTime())) {
-            numOfFailedLastModifiedSet++;
-          }
-        }
-      }
-      if (numOfFailedLastModifiedSet > 0) {
-        LOG.warn("Could not set last modfied time for {} file(s)",
-            numOfFailedLastModifiedSet);
-      }
-    }
-  }
-
-  /**
-   * Given a File input it will unzip it in the unzip directory.
+   * Given a File input it will unzip the file in a the unzip directory
    * passed as the second parameter
    * @param inFile The zip file as input
    * @param unzipDir The unzip directory where to unzip the zip file.
-   * @throws IOException An I/O exception has occurred
+   * @throws IOException
    */
   public static void unZip(File inFile, File unzipDir) throws IOException {
     Enumeration<? extends ZipEntry> entries;
@@ -687,138 +621,6 @@ public class FileUtil {
   }
 
   /**
-   * Run a command and send the contents of an input stream to it.
-   * @param inputStream Input stream to forward to the shell command
-   * @param command shell command to run
-   * @throws IOException read or write failed
-   * @throws InterruptedException command interrupted
-   * @throws ExecutionException task submit failed
-   */
-  private static void runCommandOnStream(
-      InputStream inputStream, String command)
-      throws IOException, InterruptedException, ExecutionException {
-    ExecutorService executor = null;
-    ProcessBuilder builder = new ProcessBuilder();
-    builder.command(
-        Shell.WINDOWS ? "cmd" : "bash",
-        Shell.WINDOWS ? "/c" : "-c",
-        command);
-    Process process = builder.start();
-    int exitCode;
-    try {
-      // Consume stdout and stderr, to avoid blocking the command
-      executor = Executors.newFixedThreadPool(2);
-      Future output = executor.submit(() -> {
-        try {
-          // Read until the output stream receives an EOF and closed.
-          if (LOG.isDebugEnabled()) {
-            // Log directly to avoid out of memory errors
-            try (BufferedReader reader =
-                     new BufferedReader(
-                         new InputStreamReader(process.getInputStream(),
-                             Charset.forName("UTF-8")))) {
-              String line;
-              while((line = reader.readLine()) != null) {
-                LOG.debug(line);
-              }
-            }
-          } else {
-            org.apache.commons.io.IOUtils.copy(
-                process.getInputStream(),
-                new IOUtils.NullOutputStream());
-          }
-        } catch (IOException e) {
-          LOG.debug(e.getMessage());
-        }
-      });
-      Future error = executor.submit(() -> {
-        try {
-          // Read until the error stream receives an EOF and closed.
-          if (LOG.isDebugEnabled()) {
-            // Log directly to avoid out of memory errors
-            try (BufferedReader reader =
-                     new BufferedReader(
-                         new InputStreamReader(process.getErrorStream(),
-                             Charset.forName("UTF-8")))) {
-              String line;
-              while((line = reader.readLine()) != null) {
-                LOG.debug(line);
-              }
-            }
-          } else {
-            org.apache.commons.io.IOUtils.copy(
-                process.getErrorStream(),
-                new IOUtils.NullOutputStream());
-          }
-        } catch (IOException e) {
-          LOG.debug(e.getMessage());
-        }
-      });
-
-      // Pass the input stream to the command to process
-      try {
-        org.apache.commons.io.IOUtils.copy(
-            inputStream, process.getOutputStream());
-      } finally {
-        process.getOutputStream().close();
-      }
-
-      // Wait for both stdout and stderr futures to finish
-      error.get();
-      output.get();
-    } finally {
-      // Clean up the threads
-      if (executor != null) {
-        executor.shutdown();
-      }
-      // Wait to avoid leaking the child process
-      exitCode = process.waitFor();
-    }
-
-    if (exitCode != 0) {
-      throw new IOException(
-          String.format(
-              "Error executing command. %s " +
-                  "Process exited with exit code %d.",
-              command, exitCode));
-    }
-  }
-
-  /**
-   * Given a Tar File as input it will untar the file in a the untar directory
-   * passed as the second parameter
-   *
-   * This utility will untar ".tar" files and ".tar.gz","tgz" files.
-   *
-   * @param inputStream The tar file as input.
-   * @param untarDir The untar directory where to untar the tar file.
-   * @param gzipped The input stream is gzipped
-   *                TODO Use magic number and PusbackInputStream to identify
-   * @throws IOException an exception occurred
-   * @throws InterruptedException command interrupted
-   * @throws ExecutionException task submit failed
-   */
-  public static void unTar(InputStream inputStream, File untarDir,
-                           boolean gzipped)
-      throws IOException, InterruptedException, ExecutionException {
-    if (!untarDir.mkdirs()) {
-      if (!untarDir.isDirectory()) {
-        throw new IOException("Mkdirs failed to create " + untarDir);
-      }
-    }
-
-    if(Shell.WINDOWS) {
-      // Tar is not native to Windows. Use simple Java based implementation for
-      // tests and simple tar archives
-      unTarUsingJava(inputStream, untarDir, gzipped);
-    } else {
-      // spawn tar utility to untar archive for full fledged unix behavior such
-      // as resolving symlinks in tar archives
-      unTarUsingTar(inputStream, untarDir, gzipped);
-    }
-  }
-
-  /**
    * Given a Tar File as input it will untar the file in a the untar directory
    * passed as the second parameter
    *
@@ -848,41 +650,23 @@ public class FileUtil {
     }
   }
 
-  private static void unTarUsingTar(InputStream inputStream, File untarDir,
-                                    boolean gzipped)
-      throws IOException, InterruptedException, ExecutionException {
-    StringBuilder untarCommand = new StringBuilder();
-    if (gzipped) {
-      untarCommand.append("gzip -dc | (");
-    }
-    untarCommand.append("cd '");
-    untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
-    untarCommand.append("' && ");
-    untarCommand.append("tar -x ");
-
-    if (gzipped) {
-      untarCommand.append(")");
-    }
-    runCommandOnStream(inputStream, untarCommand.toString());
-  }
-
   private static void unTarUsingTar(File inFile, File untarDir,
       boolean gzipped) throws IOException {
     StringBuffer untarCommand = new StringBuffer();
     if (gzipped) {
       untarCommand.append(" gzip -dc '");
-      untarCommand.append(FileUtil.makeSecureShellPath(inFile));
+      untarCommand.append(FileUtil.makeShellPath(inFile));
       untarCommand.append("' | (");
     }
     untarCommand.append("cd '");
-    untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
-    untarCommand.append("' && ");
+    untarCommand.append(FileUtil.makeShellPath(untarDir));
+    untarCommand.append("' ; ");
     untarCommand.append("tar -xf ");
 
     if (gzipped) {
       untarCommand.append(" -)");
     } else {
-      untarCommand.append(FileUtil.makeSecureShellPath(inFile));
+      untarCommand.append(FileUtil.makeShellPath(inFile));
     }
     String[] shellCmd = { "bash", "-c", untarCommand.toString() };
     ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
@@ -917,29 +701,6 @@ public class FileUtil {
     }
   }
 
-  private static void unTarUsingJava(InputStream inputStream, File untarDir,
-                                     boolean gzipped) throws IOException {
-    TarArchiveInputStream tis = null;
-    try {
-      if (gzipped) {
-        inputStream = new BufferedInputStream(new GZIPInputStream(
-            inputStream));
-      } else {
-        inputStream =
-            new BufferedInputStream(inputStream);
-      }
-
-      tis = new TarArchiveInputStream(inputStream);
-
-      for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) {
-        unpackEntries(tis, entry, untarDir);
-        entry = tis.getNextTarEntry();
-      }
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, tis, inputStream);
-    }
-  }
-
   private static void unpackEntries(TarArchiveInputStream tis,
       TarArchiveEntry entry, File outputDir) throws IOException {
     if (entry.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
index 89b7d76..19b51ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
@@ -34,11 +34,9 @@ import java.util.Enumeration;
 import java.util.List;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
-import java.util.jar.JarInputStream;
 import java.util.jar.Manifest;
 import java.util.regex.Pattern;
 
-import org.apache.commons.io.input.TeeInputStream;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileUtil;
@@ -100,69 +98,6 @@ public class RunJar {
    * Unpack matching files from a jar. Entries inside the jar that do
    * not match the given pattern will be skipped.
    *
-   * @param inputStream the jar stream to unpack
-   * @param toDir the destination directory into which to unpack the jar
-   * @param unpackRegex the pattern to match jar entries against
-   *
-   * @throws IOException if an I/O error has occurred or toDir
-   * cannot be created and does not already exist
-   */
-  public static void unJar(InputStream inputStream, File toDir,
-                           Pattern unpackRegex)
-      throws IOException {
-    try (JarInputStream jar = new JarInputStream(inputStream)) {
-      int numOfFailedLastModifiedSet = 0;
-      for (JarEntry entry = jar.getNextJarEntry();
-           entry != null;
-           entry = jar.getNextJarEntry()) {
-        if (!entry.isDirectory() &&
-            unpackRegex.matcher(entry.getName()).matches()) {
-          File file = new File(toDir, entry.getName());
-          ensureDirectory(file.getParentFile());
-          try (OutputStream out = new FileOutputStream(file)) {
-            IOUtils.copyBytes(jar, out, BUFFER_SIZE);
-          }
-          if (!file.setLastModified(entry.getTime())) {
-            numOfFailedLastModifiedSet++;
-          }
-        }
-      }
-      if (numOfFailedLastModifiedSet > 0) {
-        LOG.warn("Could not set last modfied time for {} file(s)",
-            numOfFailedLastModifiedSet);
-      }
-    }
-  }
-
-  /**
-   * Unpack matching files from a jar. Entries inside the jar that do
-   * not match the given pattern will be skipped. Keep also a copy
-   * of the entire jar in the same directory for backward compatibility.
-   * TODO remove this feature in a new release and do only unJar
-   *
-   * @param inputStream the jar stream to unpack
-   * @param toDir the destination directory into which to unpack the jar
-   * @param unpackRegex the pattern to match jar entries against
-   *
-   * @throws IOException if an I/O error has occurred or toDir
-   * cannot be created and does not already exist
-   */
-  @Deprecated
-  public static void unJarAndSave(InputStream inputStream, File toDir,
-                           String name, Pattern unpackRegex)
-      throws IOException{
-    File file = new File(toDir, name);
-    ensureDirectory(toDir);
-    try (OutputStream jar = new FileOutputStream(file);
-         TeeInputStream teeInputStream = new TeeInputStream(inputStream, jar)) {
-      unJar(teeInputStream, toDir, unpackRegex);
-    }
-  }
-
-  /**
-   * Unpack matching files from a jar. Entries inside the jar that do
-   * not match the given pattern will be skipped.
-   *
    * @param jarFile the .jar file to unpack
    * @param toDir the destination directory into which to unpack the jar
    * @param unpackRegex the pattern to match jar entries against

http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index 1a60948..6e59574 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.util;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
@@ -31,7 +29,6 @@ import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -57,7 +54,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.Futures;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 
 /**
  * Download a single URL to the local disk.
@@ -251,21 +247,9 @@ public class FSDownload implements Callable<Path> {
     }
   }
 
-  /**
-   * Localize files.
-   * @param destination destination directory
-   * @throws IOException cannot read or write file
-   * @throws YarnException subcommand returned an error
-   */
-  private void verifyAndCopy(Path destination)
-      throws IOException, YarnException {
-    final Path sCopy;
-    try {
-      sCopy = resource.getResource().toPath();
-    } catch (URISyntaxException e) {
-      throw new IOException("Invalid resource", e);
-    }
+  private Path copy(Path sCopy, Path dstdir) throws IOException {
     FileSystem sourceFs = sCopy.getFileSystem(conf);
+    Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
     FileStatus sStat = sourceFs.getFileStatus(sCopy);
     if (sStat.getModificationTime() != resource.getTimestamp()) {
       throw new IOException("Resource " + sCopy +
@@ -280,108 +264,82 @@ public class FSDownload implements Callable<Path> {
       }
     }
 
-    downloadAndUnpack(sCopy, destination);
+    FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
+        true, conf);
+    return dCopy;
   }
 
-  /**
-   * Copy source path to destination with localization rules.
-   * @param source source path to copy. Typically HDFS
-   * @param destination destination path. Typically local filesystem
-   * @exception YarnException Any error has occurred
-   */
-  private void downloadAndUnpack(Path source, Path destination)
-      throws YarnException {
-    try {
-      FileSystem sourceFileSystem = source.getFileSystem(conf);
-      FileSystem destinationFileSystem = destination.getFileSystem(conf);
-      if (sourceFileSystem.getFileStatus(source).isDirectory()) {
-        FileUtil.copy(
-            sourceFileSystem, source,
-            destinationFileSystem, destination, false,
-            true, conf);
+  private long unpack(File localrsrc, File dst) throws IOException {
+    switch (resource.getType()) {
+    case ARCHIVE: {
+      String lowerDst = StringUtils.toLowerCase(dst.getName());
+      if (lowerDst.endsWith(".jar")) {
+        RunJar.unJar(localrsrc, dst);
+      } else if (lowerDst.endsWith(".zip")) {
+        FileUtil.unZip(localrsrc, dst);
+      } else if (lowerDst.endsWith(".tar.gz") ||
+                 lowerDst.endsWith(".tgz") ||
+                 lowerDst.endsWith(".tar")) {
+        FileUtil.unTar(localrsrc, dst);
       } else {
-        unpack(source, destination, sourceFileSystem, destinationFileSystem);
+        LOG.warn("Cannot unpack " + localrsrc);
+        if (!localrsrc.renameTo(dst)) {
+            throw new IOException("Unable to rename file: [" + localrsrc
+              + "] to [" + dst + "]");
+        }
       }
-    } catch (Exception e) {
-      throw new YarnException("Download and unpack failed", e);
     }
-  }
-
-  /**
-   * Do the localization action on the input stream.
-   * We use the deprecated method RunJar.unJarAndSave for compatibility reasons.
-   * We should use the more efficient RunJar.unJar in the future.
-   * @param source Source path
-   * @param destination Destination pth
-   * @param sourceFileSystem Source filesystem
-   * @param destinationFileSystem Destination filesystem
-   * @throws IOException Could not read or write stream
-   * @throws InterruptedException Operation interrupted by caller
-   * @throws ExecutionException Could not create thread pool execution
-   */
-  @SuppressWarnings("deprecation")
-  private void unpack(Path source, Path destination,
-                      FileSystem sourceFileSystem,
-                      FileSystem destinationFileSystem)
-      throws IOException, InterruptedException, ExecutionException {
-    try (InputStream inputStream = sourceFileSystem.open(source)) {
-      File dst = new File(destination.toUri());
+    break;
+    case PATTERN: {
       String lowerDst = StringUtils.toLowerCase(dst.getName());
-      switch (resource.getType()) {
-      case ARCHIVE:
-        if (lowerDst.endsWith(".jar")) {
-          RunJar.unJar(inputStream, dst, RunJar.MATCH_ANY);
-        } else if (lowerDst.endsWith(".zip")) {
-          FileUtil.unZip(inputStream, dst);
-        } else if (lowerDst.endsWith(".tar.gz") ||
-            lowerDst.endsWith(".tgz") ||
-            lowerDst.endsWith(".tar")) {
-          FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
-        } else {
-          LOG.warn("Cannot unpack " + source);
-          try (OutputStream outputStream =
-                   destinationFileSystem.create(destination, true)) {
-            IOUtils.copy(inputStream, outputStream);
-          }
+      if (lowerDst.endsWith(".jar")) {
+        String p = resource.getPattern();
+        RunJar.unJar(localrsrc, dst,
+            p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
+        File newDst = new File(dst, dst.getName());
+        if (!dst.exists() && !dst.mkdir()) {
+          throw new IOException("Unable to create directory: [" + dst + "]");
         }
-        break;
-      case PATTERN:
-        if (lowerDst.endsWith(".jar")) {
-          String p = resource.getPattern();
-          if (!dst.exists() && !dst.mkdir()) {
-            throw new IOException("Unable to create directory: [" + dst + "]");
-          }
-          RunJar.unJarAndSave(inputStream, dst, source.getName(),
-              p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
-        } else if (lowerDst.endsWith(".zip")) {
-          LOG.warn("Treating [" + source + "] as an archive even though it " +
-              "was specified as PATTERN");
-          FileUtil.unZip(inputStream, dst);
-        } else if (lowerDst.endsWith(".tar.gz") ||
-            lowerDst.endsWith(".tgz") ||
-            lowerDst.endsWith(".tar")) {
-          LOG.warn("Treating [" + source + "] as an archive even though it " +
-              "was specified as PATTERN");
-          FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
-        } else {
-          LOG.warn("Cannot unpack " + source);
-          try (OutputStream outputStream =
-                   destinationFileSystem.create(destination, true)) {
-            IOUtils.copy(inputStream, outputStream);
-          }
+        if (!localrsrc.renameTo(newDst)) {
+          throw new IOException("Unable to rename file: [" + localrsrc
+              + "] to [" + newDst + "]");
         }
-        break;
-      case FILE:
-      default:
-        try (OutputStream outputStream =
-                 destinationFileSystem.create(destination, true)) {
-          IOUtils.copy(inputStream, outputStream);
+      } else if (lowerDst.endsWith(".zip")) {
+        LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
+        		"was specified as PATTERN");
+        FileUtil.unZip(localrsrc, dst);
+      } else if (lowerDst.endsWith(".tar.gz") ||
+                 lowerDst.endsWith(".tgz") ||
+                 lowerDst.endsWith(".tar")) {
+        LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
+        "was specified as PATTERN");
+        FileUtil.unTar(localrsrc, dst);
+      } else {
+        LOG.warn("Cannot unpack " + localrsrc);
+        if (!localrsrc.renameTo(dst)) {
+          throw new IOException("Unable to rename file: [" + localrsrc
+              + "] to [" + dst + "]");
         }
-        break;
       }
-      // TODO Should calculate here before returning
-      //return FileUtil.getDU(destDir);
     }
+    break;
+    case FILE:
+    default:
+      if (!localrsrc.renameTo(dst)) {
+        throw new IOException("Unable to rename file: [" + localrsrc
+          + "] to [" + dst + "]");
+      }
+      break;
+    }
+    if(localrsrc.isFile()){
+      try {
+        files.delete(new Path(localrsrc.toString()), false);
+      } catch (IOException ignore) {
+      }
+    }
+    return 0;
+    // TODO Should calculate here before returning
+    //return FileUtil.getDU(destDir);
   }
 
   @Override
@@ -394,34 +352,27 @@ public class FSDownload implements Callable<Path> {
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Starting to download %s %s %s",
-          sCopy,
-          resource.getType(),
-          resource.getPattern()));
+      LOG.debug("Starting to download " + sCopy);
     }
 
-    final Path destinationTmp = new Path(destDirPath + "_tmp");
-    createDir(destinationTmp, PRIVATE_DIR_PERMS);
-    Path dFinal =
-        files.makeQualified(new Path(destinationTmp, sCopy.getName()));
+    createDir(destDirPath, cachePerms);
+    final Path dst_work = new Path(destDirPath + "_tmp");
+    createDir(dst_work, cachePerms);
+    Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
     try {
-      if (userUgi == null) {
-        verifyAndCopy(dFinal);
-      } else {
-        userUgi.doAs(new PrivilegedExceptionAction<Void>() {
-          @Override
-          public Void run() throws Exception {
-            verifyAndCopy(dFinal);
-            return null;
-          }
-        });
-      }
+      Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
+          : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
+            public Path run() throws Exception {
+              return files.makeQualified(copy(sCopy, dst_work));
+            };
+          });
+      unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
       changePermissions(dFinal.getFileSystem(conf), dFinal);
-      files.rename(destinationTmp, destDirPath, Rename.OVERWRITE);
+      files.rename(dst_work, destDirPath, Rename.OVERWRITE);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("File has been downloaded to %s from %s",
-            new Path(destDirPath, sCopy.getName()), sCopy));
+        LOG.debug("File has been downloaded to " +
+            new Path(destDirPath, sCopy.getName()));
       }
     } catch (Exception e) {
       try {
@@ -431,7 +382,7 @@ public class FSDownload implements Callable<Path> {
       throw e;
     } finally {
       try {
-        files.delete(destinationTmp, true);
+        files.delete(dst_work, true);
       } catch (FileNotFoundException ignore) {
       }
       conf = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/901d15a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
index fa8c039..877dd08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
@@ -82,9 +82,6 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
-/**
- * Unit test for the FSDownload class.
- */
 public class TestFSDownload {
 
   private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@@ -93,8 +90,7 @@ public class TestFSDownload {
   private enum TEST_FILE_TYPE {
     TAR, JAR, ZIP, TGZ
   };
-  private Configuration conf = new Configuration();
-
+  
   @AfterClass
   public static void deleteTestDir() throws IOException {
     FileContext fs = FileContext.getLocalFSFileContext();
@@ -136,18 +132,6 @@ public class TestFSDownload {
     FileOutputStream stream = new FileOutputStream(jarFile);
     LOG.info("Create jar out stream ");
     JarOutputStream out = new JarOutputStream(stream, new Manifest());
-    ZipEntry entry = new ZipEntry("classes/1.class");
-    out.putNextEntry(entry);
-    out.write(1);
-    out.write(2);
-    out.write(3);
-    out.closeEntry();
-    ZipEntry entry2 = new ZipEntry("classes/2.class");
-    out.putNextEntry(entry2);
-    out.write(1);
-    out.write(2);
-    out.write(3);
-    out.closeEntry();
     LOG.info("Done writing jar stream ");
     out.close();
     LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
@@ -272,6 +256,7 @@ public class TestFSDownload {
   @Test (timeout=10000)
   public void testDownloadBadPublic() throws IOException, URISyntaxException,
       InterruptedException {
+    Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
@@ -322,6 +307,7 @@ public class TestFSDownload {
   @Test (timeout=60000)
   public void testDownloadPublicWithStatCache() throws IOException,
       URISyntaxException, InterruptedException, ExecutionException {
+    final Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);
     Path basedir = files.makeQualified(new Path("target",
       TestFSDownload.class.getSimpleName()));
@@ -396,6 +382,7 @@ public class TestFSDownload {
   @Test (timeout=10000)
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {
+    Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
@@ -451,7 +438,7 @@ public class TestFSDownload {
         FileStatus status = files.getFileStatus(localized.getParent());
         FsPermission perm = status.getPermission();
         assertEquals("Cache directory permissions are incorrect",
-            new FsPermission((short)0700), perm);
+            new FsPermission((short)0755), perm);
 
         status = files.getFileStatus(localized);
         perm = status.getPermission();
@@ -468,6 +455,7 @@ public class TestFSDownload {
 
   private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, 
       URISyntaxException, InterruptedException{
+    Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
@@ -542,7 +530,7 @@ public class TestFSDownload {
     }
   }
 
-  @Test (timeout=10000)
+  @Test (timeout=10000) 
   public void testDownloadArchive() throws IOException, URISyntaxException,
       InterruptedException {
     downloadWithFileType(TEST_FILE_TYPE.TAR);
@@ -554,7 +542,7 @@ public class TestFSDownload {
     downloadWithFileType(TEST_FILE_TYPE.JAR);
   }
 
-  @Test (timeout=10000)
+  @Test (timeout=10000) 
   public void testDownloadArchiveZip() throws IOException, URISyntaxException,
       InterruptedException {
     downloadWithFileType(TEST_FILE_TYPE.ZIP);
@@ -615,6 +603,7 @@ public class TestFSDownload {
 
   @Test (timeout=10000)
   public void testDirDownload() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
       TestFSDownload.class.getSimpleName()));
@@ -679,6 +668,7 @@ public class TestFSDownload {
 
   @Test (timeout=10000)
   public void testUniqueDestinationPath() throws Exception {
+    Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
         TestFSDownload.class.getSimpleName()));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org