You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:28:58 UTC

[19/37] hadoop git commit: YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi

YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b0f265d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b0f265d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b0f265d

Branch: refs/heads/HDFS-7240
Commit: 1b0f265db1a5bfccf1d870912237ea9618bd9c34
Parents: f2fa736
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Jan 26 13:25:20 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Jan 26 13:25:20 2018 -0600

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileUtil.java     | 251 ++++++++++++++++++-
 .../java/org/apache/hadoop/util/RunJar.java     |  65 +++++
 .../org/apache/hadoop/yarn/util/FSDownload.java | 215 ++++++++++------
 .../apache/hadoop/yarn/util/TestFSDownload.java |  30 ++-
 4 files changed, 462 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index 4d971aa..bf9b146 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -20,27 +20,35 @@ package org.apache.hadoop.fs;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.charset.Charset;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.jar.Attributes;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
 
 import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -75,6 +83,11 @@ public class FileUtil {
   public static final int SYMLINK_NO_PRIVILEGE = 2;
 
   /**
+   * Buffer size for copy the content of compressed file to new file.
+   */
+  private static final int BUFFER_SIZE = 8_192;
+
+  /**
    * convert an array of FileStatus to an array of Path
    *
    * @param stats
@@ -526,6 +539,22 @@ public class FileUtil {
   }
 
   /**
+   * Convert a os-native filename to a path that works for the shell
+   * and avoids script injection attacks.
+   * @param file The filename to convert
+   * @return The unix pathname
+   * @throws IOException on windows, there can be problems with the subprocess
+   */
+  public static String makeSecureShellPath(File file) throws IOException {
+    if (Shell.WINDOWS) {
+      // Currently it is never called, but it might be helpful in the future.
+      throw new UnsupportedOperationException("Not implemented for Windows");
+    } else {
+      return makeShellPath(file, false).replace("'", "'\\''");
+    }
+  }
+
+  /**
    * Convert a os-native filename to a path that works for the shell.
    * @param file The filename to convert
    * @param makeCanonicalPath
@@ -576,11 +605,48 @@ public class FileUtil {
   }
 
   /**
-   * Given a File input it will unzip the file in a the unzip directory
+   * Given a stream input it will unzip the it in the unzip directory.
+   * passed as the second parameter
+   * @param inputStream The zip file as input
+   * @param toDir The unzip directory where to unzip the zip file.
+   * @throws IOException an exception occurred
+   */
+  public static void unZip(InputStream inputStream, File toDir)
+      throws IOException {
+    try (ZipInputStream zip = new ZipInputStream(inputStream)) {
+      int numOfFailedLastModifiedSet = 0;
+      for(ZipEntry entry = zip.getNextEntry();
+          entry != null;
+          entry = zip.getNextEntry()) {
+        if (!entry.isDirectory()) {
+          File file = new File(toDir, entry.getName());
+          File parent = file.getParentFile();
+          if (!parent.mkdirs() &&
+              !parent.isDirectory()) {
+            throw new IOException("Mkdirs failed to create " +
+                parent.getAbsolutePath());
+          }
+          try (OutputStream out = new FileOutputStream(file)) {
+            IOUtils.copyBytes(zip, out, BUFFER_SIZE);
+          }
+          if (!file.setLastModified(entry.getTime())) {
+            numOfFailedLastModifiedSet++;
+          }
+        }
+      }
+      if (numOfFailedLastModifiedSet > 0) {
+        LOG.warn("Could not set last modfied time for {} file(s)",
+            numOfFailedLastModifiedSet);
+      }
+    }
+  }
+
+  /**
+   * Given a File input it will unzip it in the unzip directory.
    * passed as the second parameter
    * @param inFile The zip file as input
    * @param unzipDir The unzip directory where to unzip the zip file.
-   * @throws IOException
+   * @throws IOException An I/O exception has occurred
    */
   public static void unZip(File inFile, File unzipDir) throws IOException {
     Enumeration<? extends ZipEntry> entries;
@@ -621,6 +687,138 @@ public class FileUtil {
   }
 
   /**
+   * Run a command and send the contents of an input stream to it.
+   * @param inputStream Input stream to forward to the shell command
+   * @param command shell command to run
+   * @throws IOException read or write failed
+   * @throws InterruptedException command interrupted
+   * @throws ExecutionException task submit failed
+   */
+  private static void runCommandOnStream(
+      InputStream inputStream, String command)
+      throws IOException, InterruptedException, ExecutionException {
+    ExecutorService executor = null;
+    ProcessBuilder builder = new ProcessBuilder();
+    builder.command(
+        Shell.WINDOWS ? "cmd" : "bash",
+        Shell.WINDOWS ? "/c" : "-c",
+        command);
+    Process process = builder.start();
+    int exitCode;
+    try {
+      // Consume stdout and stderr, to avoid blocking the command
+      executor = Executors.newFixedThreadPool(2);
+      Future output = executor.submit(() -> {
+        try {
+          // Read until the output stream receives an EOF and closed.
+          if (LOG.isDebugEnabled()) {
+            // Log directly to avoid out of memory errors
+            try (BufferedReader reader =
+                     new BufferedReader(
+                         new InputStreamReader(process.getInputStream(),
+                             Charset.forName("UTF-8")))) {
+              String line;
+              while((line = reader.readLine()) != null) {
+                LOG.debug(line);
+              }
+            }
+          } else {
+            org.apache.commons.io.IOUtils.copy(
+                process.getInputStream(),
+                new IOUtils.NullOutputStream());
+          }
+        } catch (IOException e) {
+          LOG.debug(e.getMessage());
+        }
+      });
+      Future error = executor.submit(() -> {
+        try {
+          // Read until the error stream receives an EOF and closed.
+          if (LOG.isDebugEnabled()) {
+            // Log directly to avoid out of memory errors
+            try (BufferedReader reader =
+                     new BufferedReader(
+                         new InputStreamReader(process.getErrorStream(),
+                             Charset.forName("UTF-8")))) {
+              String line;
+              while((line = reader.readLine()) != null) {
+                LOG.debug(line);
+              }
+            }
+          } else {
+            org.apache.commons.io.IOUtils.copy(
+                process.getErrorStream(),
+                new IOUtils.NullOutputStream());
+          }
+        } catch (IOException e) {
+          LOG.debug(e.getMessage());
+        }
+      });
+
+      // Pass the input stream to the command to process
+      try {
+        org.apache.commons.io.IOUtils.copy(
+            inputStream, process.getOutputStream());
+      } finally {
+        process.getOutputStream().close();
+      }
+
+      // Wait for both stdout and stderr futures to finish
+      error.get();
+      output.get();
+    } finally {
+      // Clean up the threads
+      if (executor != null) {
+        executor.shutdown();
+      }
+      // Wait to avoid leaking the child process
+      exitCode = process.waitFor();
+    }
+
+    if (exitCode != 0) {
+      throw new IOException(
+          String.format(
+              "Error executing command. %s " +
+                  "Process exited with exit code %d.",
+              command, exitCode));
+    }
+  }
+
+  /**
+   * Given a Tar File as input it will untar the file in a the untar directory
+   * passed as the second parameter
+   *
+   * This utility will untar ".tar" files and ".tar.gz","tgz" files.
+   *
+   * @param inputStream The tar file as input.
+   * @param untarDir The untar directory where to untar the tar file.
+   * @param gzipped The input stream is gzipped
+   *                TODO Use magic number and PusbackInputStream to identify
+   * @throws IOException an exception occurred
+   * @throws InterruptedException command interrupted
+   * @throws ExecutionException task submit failed
+   */
+  public static void unTar(InputStream inputStream, File untarDir,
+                           boolean gzipped)
+      throws IOException, InterruptedException, ExecutionException {
+    if (!untarDir.mkdirs()) {
+      if (!untarDir.isDirectory()) {
+        throw new IOException("Mkdirs failed to create " + untarDir);
+      }
+    }
+
+    if(Shell.WINDOWS) {
+      // Tar is not native to Windows. Use simple Java based implementation for
+      // tests and simple tar archives
+      unTarUsingJava(inputStream, untarDir, gzipped);
+    } else {
+      // spawn tar utility to untar archive for full fledged unix behavior such
+      // as resolving symlinks in tar archives
+      unTarUsingTar(inputStream, untarDir, gzipped);
+    }
+  }
+
+  /**
    * Given a Tar File as input it will untar the file in a the untar directory
    * passed as the second parameter
    *
@@ -650,23 +848,41 @@ public class FileUtil {
     }
   }
 
+  private static void unTarUsingTar(InputStream inputStream, File untarDir,
+                                    boolean gzipped)
+      throws IOException, InterruptedException, ExecutionException {
+    StringBuilder untarCommand = new StringBuilder();
+    if (gzipped) {
+      untarCommand.append("gzip -dc | (");
+    }
+    untarCommand.append("cd '");
+    untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
+    untarCommand.append("' && ");
+    untarCommand.append("tar -x ");
+
+    if (gzipped) {
+      untarCommand.append(")");
+    }
+    runCommandOnStream(inputStream, untarCommand.toString());
+  }
+
   private static void unTarUsingTar(File inFile, File untarDir,
       boolean gzipped) throws IOException {
     StringBuffer untarCommand = new StringBuffer();
     if (gzipped) {
       untarCommand.append(" gzip -dc '");
-      untarCommand.append(FileUtil.makeShellPath(inFile));
+      untarCommand.append(FileUtil.makeSecureShellPath(inFile));
       untarCommand.append("' | (");
     }
     untarCommand.append("cd '");
-    untarCommand.append(FileUtil.makeShellPath(untarDir));
-    untarCommand.append("' ; ");
+    untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
+    untarCommand.append("' && ");
     untarCommand.append("tar -xf ");
 
     if (gzipped) {
       untarCommand.append(" -)");
     } else {
-      untarCommand.append(FileUtil.makeShellPath(inFile));
+      untarCommand.append(FileUtil.makeSecureShellPath(inFile));
     }
     String[] shellCmd = { "bash", "-c", untarCommand.toString() };
     ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
@@ -701,6 +917,29 @@ public class FileUtil {
     }
   }
 
+  private static void unTarUsingJava(InputStream inputStream, File untarDir,
+                                     boolean gzipped) throws IOException {
+    TarArchiveInputStream tis = null;
+    try {
+      if (gzipped) {
+        inputStream = new BufferedInputStream(new GZIPInputStream(
+            inputStream));
+      } else {
+        inputStream =
+            new BufferedInputStream(inputStream);
+      }
+
+      tis = new TarArchiveInputStream(inputStream);
+
+      for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) {
+        unpackEntries(tis, entry, untarDir);
+        entry = tis.getNextTarEntry();
+      }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, tis, inputStream);
+    }
+  }
+
   private static void unpackEntries(TarArchiveInputStream tis,
       TarArchiveEntry entry, File outputDir) throws IOException {
     if (entry.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
index 19b51ad..89b7d76 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java
@@ -34,9 +34,11 @@ import java.util.Enumeration;
 import java.util.List;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
+import java.util.jar.JarInputStream;
 import java.util.jar.Manifest;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileUtil;
@@ -98,6 +100,69 @@ public class RunJar {
    * Unpack matching files from a jar. Entries inside the jar that do
    * not match the given pattern will be skipped.
    *
+   * @param inputStream the jar stream to unpack
+   * @param toDir the destination directory into which to unpack the jar
+   * @param unpackRegex the pattern to match jar entries against
+   *
+   * @throws IOException if an I/O error has occurred or toDir
+   * cannot be created and does not already exist
+   */
+  public static void unJar(InputStream inputStream, File toDir,
+                           Pattern unpackRegex)
+      throws IOException {
+    try (JarInputStream jar = new JarInputStream(inputStream)) {
+      int numOfFailedLastModifiedSet = 0;
+      for (JarEntry entry = jar.getNextJarEntry();
+           entry != null;
+           entry = jar.getNextJarEntry()) {
+        if (!entry.isDirectory() &&
+            unpackRegex.matcher(entry.getName()).matches()) {
+          File file = new File(toDir, entry.getName());
+          ensureDirectory(file.getParentFile());
+          try (OutputStream out = new FileOutputStream(file)) {
+            IOUtils.copyBytes(jar, out, BUFFER_SIZE);
+          }
+          if (!file.setLastModified(entry.getTime())) {
+            numOfFailedLastModifiedSet++;
+          }
+        }
+      }
+      if (numOfFailedLastModifiedSet > 0) {
+        LOG.warn("Could not set last modfied time for {} file(s)",
+            numOfFailedLastModifiedSet);
+      }
+    }
+  }
+
+  /**
+   * Unpack matching files from a jar. Entries inside the jar that do
+   * not match the given pattern will be skipped. Keep also a copy
+   * of the entire jar in the same directory for backward compatibility.
+   * TODO remove this feature in a new release and do only unJar
+   *
+   * @param inputStream the jar stream to unpack
+   * @param toDir the destination directory into which to unpack the jar
+   * @param unpackRegex the pattern to match jar entries against
+   *
+   * @throws IOException if an I/O error has occurred or toDir
+   * cannot be created and does not already exist
+   */
+  @Deprecated
+  public static void unJarAndSave(InputStream inputStream, File toDir,
+                           String name, Pattern unpackRegex)
+      throws IOException{
+    File file = new File(toDir, name);
+    ensureDirectory(toDir);
+    try (OutputStream jar = new FileOutputStream(file);
+         TeeInputStream teeInputStream = new TeeInputStream(inputStream, jar)) {
+      unJar(teeInputStream, toDir, unpackRegex);
+    }
+  }
+
+  /**
+   * Unpack matching files from a jar. Entries inside the jar that do
+   * not match the given pattern will be skipped.
+   *
    * @param jarFile the .jar file to unpack
    * @param toDir the destination directory into which to unpack the jar
    * @param unpackRegex the pattern to match jar entries against

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
index 6e59574..1a60948 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.util;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
@@ -29,6 +31,7 @@ import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -54,6 +57,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 
 /**
  * Download a single URL to the local disk.
@@ -247,9 +251,21 @@ public class FSDownload implements Callable<Path> {
     }
   }
 
-  private Path copy(Path sCopy, Path dstdir) throws IOException {
+  /**
+   * Localize files.
+   * @param destination destination directory
+   * @throws IOException cannot read or write file
+   * @throws YarnException subcommand returned an error
+   */
+  private void verifyAndCopy(Path destination)
+      throws IOException, YarnException {
+    final Path sCopy;
+    try {
+      sCopy = resource.getResource().toPath();
+    } catch (URISyntaxException e) {
+      throw new IOException("Invalid resource", e);
+    }
     FileSystem sourceFs = sCopy.getFileSystem(conf);
-    Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
     FileStatus sStat = sourceFs.getFileStatus(sCopy);
     if (sStat.getModificationTime() != resource.getTimestamp()) {
       throw new IOException("Resource " + sCopy +
@@ -264,82 +280,108 @@ public class FSDownload implements Callable<Path> {
       }
     }
 
-    FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
-        true, conf);
-    return dCopy;
+    downloadAndUnpack(sCopy, destination);
   }
 
-  private long unpack(File localrsrc, File dst) throws IOException {
-    switch (resource.getType()) {
-    case ARCHIVE: {
-      String lowerDst = StringUtils.toLowerCase(dst.getName());
-      if (lowerDst.endsWith(".jar")) {
-        RunJar.unJar(localrsrc, dst);
-      } else if (lowerDst.endsWith(".zip")) {
-        FileUtil.unZip(localrsrc, dst);
-      } else if (lowerDst.endsWith(".tar.gz") ||
-                 lowerDst.endsWith(".tgz") ||
-                 lowerDst.endsWith(".tar")) {
-        FileUtil.unTar(localrsrc, dst);
+  /**
+   * Copy source path to destination with localization rules.
+   * @param source source path to copy. Typically HDFS
+   * @param destination destination path. Typically local filesystem
+   * @exception YarnException Any error has occurred
+   */
+  private void downloadAndUnpack(Path source, Path destination)
+      throws YarnException {
+    try {
+      FileSystem sourceFileSystem = source.getFileSystem(conf);
+      FileSystem destinationFileSystem = destination.getFileSystem(conf);
+      if (sourceFileSystem.getFileStatus(source).isDirectory()) {
+        FileUtil.copy(
+            sourceFileSystem, source,
+            destinationFileSystem, destination, false,
+            true, conf);
       } else {
-        LOG.warn("Cannot unpack " + localrsrc);
-        if (!localrsrc.renameTo(dst)) {
-            throw new IOException("Unable to rename file: [" + localrsrc
-              + "] to [" + dst + "]");
-        }
+        unpack(source, destination, sourceFileSystem, destinationFileSystem);
       }
+    } catch (Exception e) {
+      throw new YarnException("Download and unpack failed", e);
     }
-    break;
-    case PATTERN: {
+  }
+
+  /**
+   * Do the localization action on the input stream.
+   * We use the deprecated method RunJar.unJarAndSave for compatibility reasons.
+   * We should use the more efficient RunJar.unJar in the future.
+   * @param source Source path
+   * @param destination Destination pth
+   * @param sourceFileSystem Source filesystem
+   * @param destinationFileSystem Destination filesystem
+   * @throws IOException Could not read or write stream
+   * @throws InterruptedException Operation interrupted by caller
+   * @throws ExecutionException Could not create thread pool execution
+   */
+  @SuppressWarnings("deprecation")
+  private void unpack(Path source, Path destination,
+                      FileSystem sourceFileSystem,
+                      FileSystem destinationFileSystem)
+      throws IOException, InterruptedException, ExecutionException {
+    try (InputStream inputStream = sourceFileSystem.open(source)) {
+      File dst = new File(destination.toUri());
       String lowerDst = StringUtils.toLowerCase(dst.getName());
-      if (lowerDst.endsWith(".jar")) {
-        String p = resource.getPattern();
-        RunJar.unJar(localrsrc, dst,
-            p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
-        File newDst = new File(dst, dst.getName());
-        if (!dst.exists() && !dst.mkdir()) {
-          throw new IOException("Unable to create directory: [" + dst + "]");
+      switch (resource.getType()) {
+      case ARCHIVE:
+        if (lowerDst.endsWith(".jar")) {
+          RunJar.unJar(inputStream, dst, RunJar.MATCH_ANY);
+        } else if (lowerDst.endsWith(".zip")) {
+          FileUtil.unZip(inputStream, dst);
+        } else if (lowerDst.endsWith(".tar.gz") ||
+            lowerDst.endsWith(".tgz") ||
+            lowerDst.endsWith(".tar")) {
+          FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
+        } else {
+          LOG.warn("Cannot unpack " + source);
+          try (OutputStream outputStream =
+                   destinationFileSystem.create(destination, true)) {
+            IOUtils.copy(inputStream, outputStream);
+          }
         }
-        if (!localrsrc.renameTo(newDst)) {
-          throw new IOException("Unable to rename file: [" + localrsrc
-              + "] to [" + newDst + "]");
+        break;
+      case PATTERN:
+        if (lowerDst.endsWith(".jar")) {
+          String p = resource.getPattern();
+          if (!dst.exists() && !dst.mkdir()) {
+            throw new IOException("Unable to create directory: [" + dst + "]");
+          }
+          RunJar.unJarAndSave(inputStream, dst, source.getName(),
+              p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
+        } else if (lowerDst.endsWith(".zip")) {
+          LOG.warn("Treating [" + source + "] as an archive even though it " +
+              "was specified as PATTERN");
+          FileUtil.unZip(inputStream, dst);
+        } else if (lowerDst.endsWith(".tar.gz") ||
+            lowerDst.endsWith(".tgz") ||
+            lowerDst.endsWith(".tar")) {
+          LOG.warn("Treating [" + source + "] as an archive even though it " +
+              "was specified as PATTERN");
+          FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
+        } else {
+          LOG.warn("Cannot unpack " + source);
+          try (OutputStream outputStream =
+                   destinationFileSystem.create(destination, true)) {
+            IOUtils.copy(inputStream, outputStream);
+          }
         }
-      } else if (lowerDst.endsWith(".zip")) {
-        LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
-        		"was specified as PATTERN");
-        FileUtil.unZip(localrsrc, dst);
-      } else if (lowerDst.endsWith(".tar.gz") ||
-                 lowerDst.endsWith(".tgz") ||
-                 lowerDst.endsWith(".tar")) {
-        LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
-        "was specified as PATTERN");
-        FileUtil.unTar(localrsrc, dst);
-      } else {
-        LOG.warn("Cannot unpack " + localrsrc);
-        if (!localrsrc.renameTo(dst)) {
-          throw new IOException("Unable to rename file: [" + localrsrc
-              + "] to [" + dst + "]");
+        break;
+      case FILE:
+      default:
+        try (OutputStream outputStream =
+                 destinationFileSystem.create(destination, true)) {
+          IOUtils.copy(inputStream, outputStream);
         }
+        break;
       }
+      // TODO Should calculate here before returning
+      //return FileUtil.getDU(destDir);
     }
-    break;
-    case FILE:
-    default:
-      if (!localrsrc.renameTo(dst)) {
-        throw new IOException("Unable to rename file: [" + localrsrc
-          + "] to [" + dst + "]");
-      }
-      break;
-    }
-    if(localrsrc.isFile()){
-      try {
-        files.delete(new Path(localrsrc.toString()), false);
-      } catch (IOException ignore) {
-      }
-    }
-    return 0;
-    // TODO Should calculate here before returning
-    //return FileUtil.getDU(destDir);
   }
 
   @Override
@@ -352,27 +394,34 @@ public class FSDownload implements Callable<Path> {
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Starting to download " + sCopy);
+      LOG.debug(String.format("Starting to download %s %s %s",
+          sCopy,
+          resource.getType(),
+          resource.getPattern()));
     }
 
-    createDir(destDirPath, cachePerms);
-    final Path dst_work = new Path(destDirPath + "_tmp");
-    createDir(dst_work, cachePerms);
-    Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
+    final Path destinationTmp = new Path(destDirPath + "_tmp");
+    createDir(destinationTmp, PRIVATE_DIR_PERMS);
+    Path dFinal =
+        files.makeQualified(new Path(destinationTmp, sCopy.getName()));
     try {
-      Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
-          : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
-            public Path run() throws Exception {
-              return files.makeQualified(copy(sCopy, dst_work));
-            };
-          });
-      unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
+      if (userUgi == null) {
+        verifyAndCopy(dFinal);
+      } else {
+        userUgi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            verifyAndCopy(dFinal);
+            return null;
+          }
+        });
+      }
       changePermissions(dFinal.getFileSystem(conf), dFinal);
-      files.rename(dst_work, destDirPath, Rename.OVERWRITE);
+      files.rename(destinationTmp, destDirPath, Rename.OVERWRITE);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("File has been downloaded to " +
-            new Path(destDirPath, sCopy.getName()));
+        LOG.debug(String.format("File has been downloaded to %s from %s",
+            new Path(destDirPath, sCopy.getName()), sCopy));
       }
     } catch (Exception e) {
       try {
@@ -382,7 +431,7 @@ public class FSDownload implements Callable<Path> {
       throw e;
     } finally {
       try {
-        files.delete(dst_work, true);
+        files.delete(destinationTmp, true);
       } catch (FileNotFoundException ignore) {
       }
       conf = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b0f265d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
index 877dd08..fa8c039 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
@@ -82,6 +82,9 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
+/**
+ * Unit test for the FSDownload class.
+ */
 public class TestFSDownload {
 
   private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@@ -90,7 +93,8 @@ public class TestFSDownload {
   private enum TEST_FILE_TYPE {
     TAR, JAR, ZIP, TGZ
   };
-  
+  private Configuration conf = new Configuration();
+
   @AfterClass
   public static void deleteTestDir() throws IOException {
     FileContext fs = FileContext.getLocalFSFileContext();
@@ -132,6 +136,18 @@ public class TestFSDownload {
     FileOutputStream stream = new FileOutputStream(jarFile);
     LOG.info("Create jar out stream ");
     JarOutputStream out = new JarOutputStream(stream, new Manifest());
+    ZipEntry entry = new ZipEntry("classes/1.class");
+    out.putNextEntry(entry);
+    out.write(1);
+    out.write(2);
+    out.write(3);
+    out.closeEntry();
+    ZipEntry entry2 = new ZipEntry("classes/2.class");
+    out.putNextEntry(entry2);
+    out.write(1);
+    out.write(2);
+    out.write(3);
+    out.closeEntry();
     LOG.info("Done writing jar stream ");
     out.close();
     LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
@@ -256,7 +272,6 @@ public class TestFSDownload {
   @Test (timeout=10000)
   public void testDownloadBadPublic() throws IOException, URISyntaxException,
       InterruptedException {
-    Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
@@ -307,7 +322,6 @@ public class TestFSDownload {
   @Test (timeout=60000)
   public void testDownloadPublicWithStatCache() throws IOException,
       URISyntaxException, InterruptedException, ExecutionException {
-    final Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);
     Path basedir = files.makeQualified(new Path("target",
       TestFSDownload.class.getSimpleName()));
@@ -382,7 +396,6 @@ public class TestFSDownload {
   @Test (timeout=10000)
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {
-    Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
@@ -438,7 +451,7 @@ public class TestFSDownload {
         FileStatus status = files.getFileStatus(localized.getParent());
         FsPermission perm = status.getPermission();
         assertEquals("Cache directory permissions are incorrect",
-            new FsPermission((short)0755), perm);
+            new FsPermission((short)0700), perm);
 
         status = files.getFileStatus(localized);
         perm = status.getPermission();
@@ -455,7 +468,6 @@ public class TestFSDownload {
 
   private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, 
       URISyntaxException, InterruptedException{
-    Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
@@ -530,7 +542,7 @@ public class TestFSDownload {
     }
   }
 
-  @Test (timeout=10000) 
+  @Test (timeout=10000)
   public void testDownloadArchive() throws IOException, URISyntaxException,
       InterruptedException {
     downloadWithFileType(TEST_FILE_TYPE.TAR);
@@ -542,7 +554,7 @@ public class TestFSDownload {
     downloadWithFileType(TEST_FILE_TYPE.JAR);
   }
 
-  @Test (timeout=10000) 
+  @Test (timeout=10000)
   public void testDownloadArchiveZip() throws IOException, URISyntaxException,
       InterruptedException {
     downloadWithFileType(TEST_FILE_TYPE.ZIP);
@@ -603,7 +615,6 @@ public class TestFSDownload {
 
   @Test (timeout=10000)
   public void testDirDownload() throws IOException, InterruptedException {
-    Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
       TestFSDownload.class.getSimpleName()));
@@ -668,7 +679,6 @@ public class TestFSDownload {
 
   @Test (timeout=10000)
   public void testUniqueDestinationPath() throws Exception {
-    Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);
     final Path basedir = files.makeQualified(new Path("target",
         TestFSDownload.class.getSimpleName()));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org