You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bo...@apache.org on 2013/03/25 16:46:25 UTC

svn commit: r1460723 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/

Author: bobby
Date: Mon Mar 25 15:46:25 2013
New Revision: 1460723

URL: http://svn.apache.org/r1460723
Log:
YARN-109. .tmp file is not deleted for localized archives (Mayank Bansal via bobby)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1460723&r1=1460722&r2=1460723&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Mar 25 15:46:25 2013
@@ -463,6 +463,9 @@ Release 0.23.7 - UNRELEASED
     YARN-345. Many InvalidStateTransitonException errors for ApplicationImpl
     in Node Manager (Robert Parker via jlowe)
 
+    YARN-109. .tmp file is not deleted for localized archives (Mayank Bansal 
+    via bobby)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1460723&r1=1460722&r2=1460723&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java Mon Mar 25 15:46:25 2013
@@ -251,6 +251,12 @@ public class FSDownload implements Calla
       }
       break;
     }
+    if(localrsrc.isFile()){
+      try {
+        files.delete(new Path(localrsrc.toString()), false);
+      } catch (IOException ignore) {
+      }
+    }
     return 0;
     // TODO Should calculate here before returning
     //return FileUtil.getDU(destDir);
@@ -264,41 +270,41 @@ public class FSDownload implements Calla
     } catch (URISyntaxException e) {
       throw new IOException("Invalid resource", e);
     }
-
     Path tmp;
     do {
       tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
     } while (files.util().exists(tmp));
     destDirPath = tmp;
-
     createDir(destDirPath, cachePerms);
     final Path dst_work = new Path(destDirPath + "_tmp");
     createDir(dst_work, cachePerms);
-
     Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
     try {
-      Path dTmp = null == userUgi
-        ? files.makeQualified(copy(sCopy, dst_work))
-        : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
+      Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
+          : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
             public Path run() throws Exception {
               return files.makeQualified(copy(sCopy, dst_work));
             };
           });
       Pattern pattern = null;
       String p = resource.getPattern();
-      if(p != null) {
+      if (p != null) {
         pattern = Pattern.compile(p);
       }
       unpack(new File(dTmp.toUri()), new File(dFinal.toUri()), pattern);
       changePermissions(dFinal.getFileSystem(conf), dFinal);
       files.rename(dst_work, destDirPath, Rename.OVERWRITE);
     } catch (Exception e) {
-      try { files.delete(destDirPath, true); } catch (IOException ignore) { }
+      try {
+        files.delete(destDirPath, true);
+      } catch (IOException ignore) {
+      }
       throw e;
     } finally {
       try {
         files.delete(dst_work, true);
-      } catch (FileNotFoundException ignore) { }
+      } catch (FileNotFoundException ignore) {
+      }
       // clear ref to internal var
       rand = null;
       conf = null;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1460723&r1=1460722&r2=1460723&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Mon Mar 25 15:46:25 2013
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEqu
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -47,10 +48,12 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -113,7 +116,127 @@ public class TestFSDownload {
     return ret;
   }
   
-  @Test
+  static LocalResource createTarFile(FileContext files, Path p, int len,
+      Random r, LocalResourceVisibility vis) throws IOException,
+      URISyntaxException {
+
+    FSDataOutputStream outFile = null;
+    try {
+      byte[] bytes = new byte[len];
+      Path tarPath = new Path(p.toString());
+      outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      outFile.write(bytes);
+    } finally {
+      if (outFile != null)
+        outFile.close();
+    }
+    StringBuffer tarCommand = new StringBuffer();
+    URI u = new URI(p.getParent().toString());
+    tarCommand.append("cd '");
+    tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+    tarCommand.append("' ; ");
+    tarCommand.append("tar -czf " + p.getName() + ".tar " + p.getName());
+    String[] shellCmd = { "bash", "-c", tarCommand.toString() };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+    shexec.execute();
+    int exitcode = shexec.getExitCode();
+    if (exitcode != 0) {
+      throw new IOException("Error untarring file " + p
+          + ". Tar process exited with exit code " + exitcode);
+    }
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+        + ".tar")));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.ARCHIVE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar"))
+        .getModificationTime());
+    return ret;
+  }
+  
+  static LocalResource createJarFile(FileContext files, Path p, int len,
+      Random r, LocalResourceVisibility vis) throws IOException,
+      URISyntaxException {
+
+    FSDataOutputStream outFile = null;
+    try {
+      byte[] bytes = new byte[len];
+      Path tarPath = new Path(p.toString());
+      outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      outFile.write(bytes);
+    } finally {
+      if (outFile != null)
+        outFile.close();
+    }
+    StringBuffer tarCommand = new StringBuffer();
+    URI u = new URI(p.getParent().toString());
+    tarCommand.append("cd '");
+    tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+    tarCommand.append("' ; ");
+    tarCommand.append("jar cf " + p.getName() + ".jar " + p.getName());
+    String[] shellCmd = { "bash", "-c", tarCommand.toString() };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+    shexec.execute();
+    int exitcode = shexec.getExitCode();
+    if (exitcode != 0) {
+      throw new IOException("Error untarring file " + p
+          + ". Tar process exited with exit code " + exitcode);
+    }
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+        + ".jar")));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.ARCHIVE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar"))
+        .getModificationTime());
+    return ret;
+  }
+  
+  static LocalResource createZipFile(FileContext files, Path p, int len,
+      Random r, LocalResourceVisibility vis) throws IOException,
+      URISyntaxException {
+
+    FSDataOutputStream outFile = null;
+    try {
+      byte[] bytes = new byte[len];
+      Path tarPath = new Path(p.toString());
+      outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      outFile.write(bytes);
+    } finally {
+      if (outFile != null)
+        outFile.close();
+    }
+    StringBuffer zipCommand = new StringBuffer();
+    URI u = new URI(p.getParent().toString());
+    zipCommand.append("cd '");
+    zipCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+    zipCommand.append("' ; ");
+    zipCommand.append("gzip " + p.getName());
+    String[] shellCmd = { "bash", "-c", zipCommand.toString() };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+    shexec.execute();
+    int exitcode = shexec.getExitCode();
+    if (exitcode != 0) {
+      throw new IOException("Error untarring file " + p
+          + ". Tar process exited with exit code " + exitcode);
+    }
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+        + ".zip")));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.ARCHIVE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".gz"))
+        .getModificationTime());
+    return ret;
+  }
+  
+  @Test (timeout=10000)
   public void testDownloadBadPublic() throws IOException, URISyntaxException,
       InterruptedException {
     Configuration conf = new Configuration();
@@ -161,7 +284,7 @@ public class TestFSDownload {
     }
   }
   
-  @Test
+  @Test (timeout=10000)
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {
     Configuration conf = new Configuration();
@@ -229,6 +352,175 @@ public class TestFSDownload {
     }
   }
   
+  @SuppressWarnings("deprecation")
+  @Test (timeout=10000) 
+  public void testDownloadArchive() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+        TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    LocalDirAllocator dirs = new LocalDirAllocator(
+        TestFSDownload.class.getName());
+
+    int size = rand.nextInt(512) + 512;
+    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+    Path p = new Path(basedir, "" + 1);
+    LocalResource rsrc = createTarFile(files, p, size, rand, vis);
+    Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    FSDownload fsd = new FSDownload(files,
+        UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
+        new Random(sharedSeed));
+    pending.put(rsrc, exec.submit(fsd));
+    
+    try {
+      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+          basedir);
+      for (FileStatus filestatus : filesstatus) {
+        if (filestatus.isDir()) {
+          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+              filestatus.getPath());
+          for (FileStatus childfile : childFiles) {
+            if (childfile.getPath().getName().equalsIgnoreCase("1.tar.tmp")) {
+              Assert.fail("Tmp File should not have been there "
+                  + childfile.getPath());
+            }
+          }
+        }
+      }
+    }catch (Exception e) {
+      throw new IOException("Failed exec", e);
+    }
+    finally {
+      exec.shutdown();
+    }
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test (timeout=10000) 
+  public void testDownloadPatternJar() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+        TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    LocalDirAllocator dirs = new LocalDirAllocator(
+        TestFSDownload.class.getName());
+
+    int size = rand.nextInt(512) + 512;
+    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+    Path p = new Path(basedir, "" + 1);
+    LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
+    rsrcjar.setType(LocalResourceType.PATTERN);
+    Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    FSDownload fsdjar = new FSDownload(files,
+        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
+        new Random(sharedSeed));
+    pending.put(rsrcjar, exec.submit(fsdjar));
+
+    try {
+      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+          basedir);
+      for (FileStatus filestatus : filesstatus) {
+        if (filestatus.isDir()) {
+          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+              filestatus.getPath());
+          for (FileStatus childfile : childFiles) {
+            if (childfile.getPath().getName().equalsIgnoreCase("1.jar.tmp")) {
+              Assert.fail("Tmp File should not have been there "
+                  + childfile.getPath());
+            }
+          }
+        }
+      }
+    }catch (Exception e) {
+      throw new IOException("Failed exec", e);
+    }
+    finally {
+      exec.shutdown();
+    }
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test (timeout=10000) 
+  public void testDownloadArchiveZip() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+        TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    LocalDirAllocator dirs = new LocalDirAllocator(
+        TestFSDownload.class.getName());
+
+    int size = rand.nextInt(512) + 512;
+    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+    Path p = new Path(basedir, "" + 1);
+    LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
+    Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    FSDownload fsdzip = new FSDownload(files,
+        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
+        new Random(sharedSeed));
+    pending.put(rsrczip, exec.submit(fsdzip));
+
+    try {
+      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+          basedir);
+      for (FileStatus filestatus : filesstatus) {
+        if (filestatus.isDir()) {
+          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+              filestatus.getPath());
+          for (FileStatus childfile : childFiles) {
+            if (childfile.getPath().getName().equalsIgnoreCase("1.gz.tmp")) {
+              Assert.fail("Tmp File should not have been there "
+                  + childfile.getPath());
+            }
+          }
+        }
+      }
+    }catch (Exception e) {
+      throw new IOException("Failed exec", e);
+    }
+    finally {
+      exec.shutdown();
+    }
+  }
+  
   private void verifyPermsRecursively(FileSystem fs,
       FileContext files, Path p,
       LocalResourceVisibility vis) throws IOException {
@@ -261,7 +553,7 @@ public class TestFSDownload {
     }      
   }
   
-  @Test
+  @Test (timeout=10000)
   public void testDirDownload() throws IOException, InterruptedException {
     Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);