You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/12 23:24:11 UTC

svn commit: r486372 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Dec 12 14:24:09 2006
New Revision: 486372

URL: http://svn.apache.org/viewvc?view=rev&rev=486372
Log:
HADOOP-673.  Give each task its own working directory again.  Contributed by Mahadev.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Dec 12 14:24:09 2006
@@ -71,6 +71,9 @@
 21. HADOOP-792.  Fix 'dfs -mv' to return correct status.
     (Dhruba Borthakur via cutting) 
 
+22. HADOOP-673.  Give each task its own working directory again.
+    (Mahadev Konar via cutting)
+
 
 Release 0.9.1 - 2006-12-06
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Dec 12 14:24:09 2006
@@ -234,10 +234,12 @@
       String[] argvSplit = splitArgs(argv);
       String prog = argvSplit[0];
       String userdir = System.getProperty("user.dir");
+      File currentDir = new File(".").getAbsoluteFile();
+      File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
       if (new File(prog).isAbsolute()) {
         // we don't own it. Hope it is executable
       } else {
-        new MustangFile(prog).setExecutable(true, true);
+        new MustangFile(new File(jobCacheDir, prog).toString()).setExecutable(true, true);
       }
 
       if (job_.getInputValueClass().equals(BytesWritable.class)) {
@@ -282,7 +284,7 @@
       //
       if (!new File(argvSplit[0]).isAbsolute()) {
           PathFinder finder = new PathFinder("PATH");
-          finder.prependPathComponent(".");
+          finder.prependPathComponent(jobCacheDir.toString());
           File f = finder.getAbsolutePath(argvSplit[0]);
           if (f != null) {
               argvSplit[0] = f.getAbsolutePath();

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Dec 12 14:24:09 2006
@@ -620,8 +620,8 @@
       boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
       if (!b)
         fail(LINK_URI);
-      DistributedCache.createSymlink(jobConf_);
     }
+    DistributedCache.createSymlink(jobConf_);
     // set the jobconf for the caching parameters
     if (cacheArchives != null)
       DistributedCache.setCacheArchives(archiveURIs, jobConf_);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Tue Dec 12 14:24:09 2006
@@ -24,6 +24,7 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
+
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.net.URI;
@@ -108,6 +109,8 @@
     String cacheId = makeRelative(cache, conf);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+      if (lcacheStatus == null)
+        return;
       synchronized (lcacheStatus) {
         lcacheStatus.refcount--;
       }
@@ -320,7 +323,29 @@
 
     return digest;
   }
-
+  
+  /**
+   * This method create symlinks for all files in a given dir in another directory
+   * @param conf the configuration
+   * @param jobCacheDir the target directory for creating symlinks
+   * @param workDir the directory in which the symlinks are created
+   * @throws IOException
+   */
+  public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
+  throws IOException{
+    if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){
+      return;
+    }
+    boolean createSymlink = getSymlink(conf);
+     if (createSymlink){
+       File[] list = jobCacheDir.listFiles();
+       for (int i=0; i < list.length; i++){
+         FileUtil.symLink(list[i].getAbsolutePath(),
+             new File(workDir, list[i].getName()).toString());
+       }
+     }  
+  }
+  
   private static String getFileSysName(URI url) {
     String fsname = url.getScheme();
     if ("dfs".equals(fsname)) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Dec 12 14:24:09 2006
@@ -43,6 +43,16 @@
             return false;
           }
         } else {
+          //try deleting the directory
+          // this might be a symlink
+          boolean b = false;
+          b = contents[i].delete();
+          if (b){
+            //this was indeed a symlink or an empty directory
+            continue;
+          }
+          // if not an empty directory or symlink let
+          // fullydelete handle it.
           if (! fullyDelete(contents[i])) {
             return false;
           }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Dec 12 14:24:09 2006
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.util.*;
 import java.io.*;
 import java.util.Vector;
 import java.net.URI;
@@ -82,7 +83,8 @@
       
       //before preparing the job localize 
       //all the archives
-      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
+      File workDir = new File(t.getJobFile()).getParentFile();
+      File jobCacheDir = new File(workDir.getParent(), "work");
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       if ((archives != null) || (files != null)) {
@@ -104,8 +106,6 @@
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
-        
-        // sets the paths to local archives and paths
         Path localTaskFile = new Path(t.getJobFile());
         FileSystem localFs = FileSystem.getNamed("local", conf);
         localFs.delete(localTaskFile);
@@ -116,6 +116,16 @@
           out.close();
         }
       }
+    
+      // create symlinks for all the files in job cache dir in current
+      // workingdir for streaming
+      try{
+        DistributedCache.createAllSymlink(conf, jobCacheDir, 
+            workDir);
+      } catch(IOException ie){
+        // Do not exit even if symlinks have not been created.
+        LOG.warn(StringUtils.stringifyException(ie));
+      }
       
       if (! prepare()) {
         return;
@@ -135,7 +145,7 @@
       String jar = conf.getJar();
       if (jar != null) {       
     	  // if jar exists, it into workDir
-        File[] libs = new File(workDir, "lib").listFiles();
+        File[] libs = new File(jobCacheDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
             classPath.append(sep);            // add libs from jar to classpath
@@ -143,11 +153,13 @@
           }
         }
         classPath.append(sep);
-        classPath.append(new File(workDir, "classes"));
+        classPath.append(new File(jobCacheDir, "classes"));
         classPath.append(sep);
-        classPath.append(workDir);
+        classPath.append(jobCacheDir);
+       
       }
-
+      classPath.append(sep);
+      classPath.append(workDir);
       //  Build exec child jmv args.
       Vector vargs = new Vector(8);
       File jvm =                                  // use same jvm as parent