You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/20 12:19:39 UTC

svn commit: r639247 [3/3] - in /hadoop/core/trunk: ./ docs/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu Mar 20 04:19:34 2008
@@ -137,11 +137,10 @@
       String[] argvSplit = splitArgs(argv);
       String prog = argvSplit[0];
       File currentDir = new File(".").getAbsoluteFile();
-      File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
       if (new File(prog).isAbsolute()) {
         // we don't own it. Hope it is executable
       } else {
-        FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
+        FileUtil.chmod(new File(currentDir, prog).toString(), "a+x");
       }
 
       // 
@@ -153,7 +152,7 @@
       //
       if (!new File(argvSplit[0]).isAbsolute()) {
         PathFinder finder = new PathFinder("PATH");
-        finder.prependPathComponent(jobCacheDir.toString());
+        finder.prependPathComponent(currentDir.toString());
         File f = finder.getAbsolutePath(argvSplit[0]);
         if (f != null) {
           argvSplit[0] = f.getAbsolutePath();

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Thu Mar 20 04:19:34 2008
@@ -1062,6 +1062,33 @@
           <code>&lt;/property&gt;</code>
         </p>
         
+        <p>When the job starts, the localized job directory
+        <code> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</code>
+        has the following directories: </p>
+        <ul>
+        <li> A job-specific shared directory, created at location
+        <code>${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+        This directory is exposed to the users through 
+        <code>job.local.dir </code>. The tasks can use this space as scratch
+        space and share files among them. The directory can accessed through 
+        api <a href="ext:api/org/apache/hadoop/mapred/jobconf/getjoblocaldir">
+        JobConf.getJobLocalDir()</a>. It is available as System property also.
+        So,users can call <code>System.getProperty("job.local.dir")</code>;
+        </li>
+        <li>A jars directory, which has the job jar file and expanded jar </li>
+        <li>A job.xml file, the generic job configuration </li>
+        <li>Each task has directory <code>task-id</code> which again has the 
+        following structure
+        <ul>
+        <li>A job.xml file, task localized job configuration </li>
+        <li>A directory for intermediate output files</li>
+        <li>The working directory of the task. 
+        And work directory has a temporary directory 
+        to create temporary files</li>
+        </ul>
+        </li>
+        </ul>
+ 
         <p>The <a href="#DistributedCache">DistributedCache</a> can also be used
         as a rudimentary software distribution mechanism for use in the map 
         and/or reduce tasks. It can be used to distribute both jars and 

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml Thu Mar 20 04:19:34 2008
@@ -154,6 +154,7 @@
                 <setcompressmapoutput href="#setCompressMapOutput(boolean)" />
                 <setmapoutputcompressiontype href="#setMapOutputCompressionType(org.apache.hadoop.io.SequenceFile.CompressionType)" />
                 <setmapoutputcompressorclass href="#setMapOutputCompressorClass(java.lang.Class)" />
+                <getjoblocaldir href="#getJobLocalDir()" />
               </jobconf>
               <jobconfigurable href="JobConfigurable.html">
                 <configure href="#configure(org.apache.hadoop.mapred.JobConf)" />

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Thu Mar 20 04:19:34 2008
@@ -28,6 +28,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -115,13 +116,14 @@
    * @param conf the jobconf
    * @throws IOException if something goes wrong writing
    */
-  private static void fillInMissingMapOutputs(FileSystem fs, 
+  private static void fillInMissingMapOutputs(FileSystem fs,
+                                              String jobId,
                                               String taskId,
                                               int numMaps,
                                               JobConf conf) throws IOException {
     Class keyClass = conf.getMapOutputKeyClass();
     Class valueClass = conf.getMapOutputValueClass();
-    MapOutputFile namer = new MapOutputFile();
+    MapOutputFile namer = new MapOutputFile(jobId);
     namer.setConf(conf);
     for(int i=0; i<numMaps; i++) {
       Path f = namer.getInputFile(i, taskId);
@@ -156,8 +158,13 @@
     
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
-    File taskDir = new File(jobFilename.getParent());
-    File workDirName = new File(taskDir.getParent(), "work");
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    File workDirName = new File(lDirAlloc.getLocalPathToRead(
+                                  TaskTracker.getJobCacheSubdir() 
+                                  + Path.SEPARATOR + jobId 
+                                  + Path.SEPARATOR + taskId
+                                  + Path.SEPARATOR + "work",
+                                  conf). toString());
     local.setWorkingDirectory(new Path(workDirName.toString()));
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
     
@@ -179,7 +186,7 @@
                          taskId, partition, splitClass, split);
     } else {
       int numMaps = conf.getNumMapTasks();
-      fillInMissingMapOutputs(local, taskId, numMaps, conf);
+      fillInMissingMapOutputs(local, jobId, taskId, numMaps, conf);
       task = new ReduceTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), taskId, 
                             partition, numMaps);
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Mar 20 04:19:34 2008
@@ -1334,6 +1334,25 @@
   public void setJobEndNotificationURI(String uri) {
     set("job.end.notification.url", uri);
   }
+
+  /**
+   * Get job-specific shared directory for use as scratch space
+   * 
+   * <p>
+   * When a job starts, a shared directory is created at location
+   * <code>
+   * ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+   * This directory is exposed to the users through 
+   * <code>job.local.dir </code>.
+   * So, the tasks can use this space 
+   * as scratch space and share files among them. </p>
+   * This value is available as System property also.
+   * 
+   * @return The localized job specific shared directory
+   */
+  public String getJobLocalDir() {
+    return get("job.local.dir");
+  }
   
   /** 
    * Find a jar that contains a class of the same name, if any.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Mar 20 04:19:34 2008
@@ -78,7 +78,7 @@
     public Job(String jobid, JobConf conf) throws IOException {
       this.file = new Path(conf.getSystemDir(), jobid + "/job.xml");
       this.id = jobid;
-      this.mapoutputFile = new MapOutputFile();
+      this.mapoutputFile = new MapOutputFile(jobid);
       this.mapoutputFile.setConf(conf);
 
       this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Thu Mar 20 04:19:34 2008
@@ -30,6 +30,15 @@
 class MapOutputFile {
 
   private JobConf conf;
+  private String jobDir;
+  
+  MapOutputFile() {
+  }
+
+  MapOutputFile(String jobId) {
+    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+  }
+
   private LocalDirAllocator lDirAlloc = 
                             new LocalDirAllocator("mapred.local.dir");
   
@@ -38,7 +47,9 @@
    */
   public Path getOutputFile(String mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/file.out", conf);
   }
 
   /** Create a local map output file name.
@@ -47,7 +58,9 @@
    */
   public Path getOutputFileForWrite(String mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/file.out", size, conf);
   }
 
   /** Return the path to a local map output index file created earlier
@@ -55,7 +68,9 @@
    */
   public Path getOutputIndexFile(String mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/file.out.index", conf);
   }
 
   /** Create a local map output index file name.
@@ -64,7 +79,9 @@
    */
   public Path getOutputIndexFileForWrite(String mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index", 
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/file.out.index", 
                                           size, conf);
   }
 
@@ -74,8 +91,10 @@
    */
   public Path getSpillFile(String mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out",
-                                        conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/spill" 
+                                        + spillNumber + ".out", conf);
   }
 
   /** Create a local map spill file name.
@@ -85,9 +104,10 @@
    */
   public Path getSpillFileForWrite(String mapTaskId, int spillNumber, 
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(mapTaskId+
-                                                  "/spill" +spillNumber+".out",
-                                                  size, conf);
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/spill" + 
+                                          spillNumber + ".out", size, conf);
   }
 
   /** Return a local map spill index file created earlier
@@ -96,8 +116,10 @@
    */
   public Path getSpillIndexFile(String mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        mapTaskId+"/spill" +spillNumber+".out.index", conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/spill" + 
+                                        spillNumber + ".out.index", conf);
   }
 
   /** Create a local map spill index file name.
@@ -107,8 +129,10 @@
    */
   public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber,
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        mapTaskId+"/spill" +spillNumber+".out.index", size, conf);
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/spill" + spillNumber + 
+                                          ".out.index", size, conf);
   }
 
   /** Return a local reduce input file created earlier
@@ -118,7 +142,9 @@
   public Path getInputFile(int mapId, String reduceTaskId)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out",
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        reduceTaskId + Path.SEPARATOR + 
+                                        "output" + "/map_" + mapId + ".out",
                                         conf);
   }
 
@@ -130,21 +156,16 @@
   public Path getInputFileForWrite(int mapId, String reduceTaskId, long size)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out",
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          reduceTaskId + Path.SEPARATOR +
+                                          "output" + "/map_" + mapId + ".out",
                                           size, conf);
   }
 
   /** Removes all of the files related to a task. */
   public void removeAll(String taskId) throws IOException {
-    conf.deleteLocalFiles(taskId);
-  }
-
-  /** 
-   * Removes all contents of temporary storage.  Called upon 
-   * startup, to remove any leftovers from previous run.
-   */
-  public void cleanupStorage() throws IOException {
-    conf.deleteLocalFiles();
+    conf.deleteLocalFiles(jobDir + Path.SEPARATOR +
+                          taskId + Path.SEPARATOR + "output");
   }
 
   public void setConf(Configuration conf) {
@@ -154,4 +175,9 @@
       this.conf = new JobConf(conf);
     }
   }
+  
+  public void setJobId(String jobId) {
+    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+  }
+
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Thu Mar 20 04:19:34 2008
@@ -48,14 +48,16 @@
   private int mapId;
   private String host;
   private int port;
+  private String jobId;
 
   /** RPC constructor **/
   public MapOutputLocation() {
   }
 
   /** Construct a location. */
-  public MapOutputLocation(String mapTaskId, int mapId, 
+  public MapOutputLocation(String jobId, String mapTaskId, int mapId, 
                            String host, int port) {
+    this.jobId = jobId;
     this.mapTaskId = mapTaskId;
     this.mapId = mapId;
     this.host = host;
@@ -80,22 +82,24 @@
   public int getPort() { return port; }
 
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, mapTaskId);
+    out.writeUTF(jobId);
+    out.writeUTF(mapTaskId);
     out.writeInt(mapId);
-    UTF8.writeString(out, host);
+    out.writeUTF(host);
     out.writeInt(port);
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.mapTaskId = UTF8.readString(in);
+    this.jobId = in.readUTF();
+    this.mapTaskId = in.readUTF();
     this.mapId = in.readInt();
-    this.host = UTF8.readString(in);
+    this.host = in.readUTF();
     this.port = in.readInt();
   }
 
   public String toString() {
-    return "http://" + host + ":" + port + "/mapOutput?map=" + 
-      mapTaskId;
+    return "http://" + host + ":" + port + "/mapOutput?job=" + jobId +
+           "&map=" + mapTaskId;
   }
   
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Mar 20 04:19:34 2008
@@ -46,7 +46,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.InMemoryFileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -807,7 +806,10 @@
         // a temp filename. If this file gets created in ramfs, we're fine,
         // else, we will check the localFS to find a suitable final location
         // for this path
-        Path filename = new Path("/" + reduceId + "/map_" +
+        Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
+                                 Path.SEPARATOR + getJobId() +
+                                 Path.SEPARATOR + reduceId +
+                                 Path.SEPARATOR + "output" + "/map_" +
                                  loc.getMapId() + ".out");
         // a working filename that will be unique to this attempt
         Path tmpFilename = new Path(filename + "-" + id);
@@ -903,13 +905,7 @@
       // add the jars and directories to the classpath
       String jar = conf.getJar();
       if (jar != null) {      
-        LocalDirAllocator lDirAlloc = 
-                            new LocalDirAllocator("mapred.local.dir");
-        File jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
-                                      TaskTracker.getJobCacheSubdir() 
-                                      + Path.SEPARATOR + getJobId() 
-                                      + Path.SEPARATOR  
-                                      + "work", conf).toString());
+        File jobCacheDir = new File(new Path(jar).getParent().toString());
 
         File[] libs = new File(jobCacheDir, "lib").listFiles();
         if (libs != null) {
@@ -1484,7 +1480,8 @@
               maxFetchRetriesPerMap = 
                   getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
             }
-            knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
+            knownOutputs.add(new MapOutputLocation(reduceTask.getJobId(),
+                             taskId, mId, host, port));
           }
           break;
           case FAILED:

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Mar 20 04:19:34 2008
@@ -114,6 +114,7 @@
                                                     TaskStatus.Phase.MAP : 
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
+    this.mapOutputFile.setJobId(jobId);
   }
 
   ////////////////////////////////////////////
@@ -186,6 +187,7 @@
       taskOutputPath = null;
     }
     taskStatus.readFields(in);
+    this.mapOutputFile.setJobId(jobId); 
   }
 
   public String toString() { return taskId; }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Mar 20 04:19:34 2008
@@ -54,7 +54,7 @@
     this.t = t;
     this.tracker = tracker;
     this.conf = conf;
-    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile = new MapOutputFile(t.getJobId());
     this.mapOutputFile.setConf(conf);
   }
 
@@ -91,19 +91,20 @@
       
       //before preparing the job localize 
       //all the archives
-      File workDir = new File(t.getJobFile()).getParentFile();
       String taskid = t.getTaskId();
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File jobCacheDir = null;
-      try {
-        jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
-                                    TaskTracker.getJobCacheSubdir() 
-                                    + Path.SEPARATOR + t.getJobId() 
-                                    + Path.SEPARATOR  
-                                    + "work", conf).toString());
-      } catch (IOException ioe) {
-        LOG.warn("work directory doesnt exist");
+      if (conf.getJar() != null) {
+        jobCacheDir = new File(
+                          new Path(conf.getJar()).getParent().toString());
       }
+      File workDir = new File(lDirAlloc.getLocalPathToRead(
+                                TaskTracker.getJobCacheSubdir() 
+                                + Path.SEPARATOR + t.getJobId() 
+                                + Path.SEPARATOR + t.getTaskId()
+                                + Path.SEPARATOR + "work",
+                                conf). toString());
+
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       FileStatus fileStatus;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 20 04:19:34 2008
@@ -152,7 +152,6 @@
   private static final String JOBCACHE = "jobcache";
   private JobConf originalConf;
   private JobConf fConf;
-  private MapOutputFile mapOutputFile;
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
@@ -448,7 +447,7 @@
 
     // Clear out temporary files that might be lying around
     DistributedCache.purgeCache(this.fConf);
-    this.mapOutputFile.cleanupStorage();
+    cleanupStorage();
     this.justStarted = true;
 
     this.jobClient = (InterTrackerProtocol) 
@@ -465,7 +464,15 @@
                              taskTrackerName);
     mapEventsFetcher.start();
   }
-    
+  
+  /** 
+   * Removes all contents of temporary storage.  Called upon 
+   * startup, to remove any leftovers from previous run.
+   */
+  public void cleanupStorage() throws IOException {
+    this.fConf.deleteLocalFiles();
+  }
+
   // Object on wait which MapEventsFetcherThread is going to wait.
   private Object waitingOn = new Object();
 
@@ -638,13 +645,11 @@
         jarFileSize = -1;
       }
     }
-    // Here we check for double the size of jobfile to accommodate for
-    // localize task file and we check four times the size of jarFileSize to 
-    // accommodate for unjarring the jar file in work directory 
+
     Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
                                     + Path.SEPARATOR + jobId 
                                     + Path.SEPARATOR + "job.xml"),
-                                    2 * jobFileSize + 5 * jarFileSize, fConf);
+                                    jobFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
     synchronized (rjob) {
       if (!rjob.localized) {
@@ -667,18 +672,30 @@
         JobConf localJobConf = new JobConf(localJobFile);
         
         // create the 'work' directory
-        File workDir = new File(new File(localJobFile.toString()).getParent(),
-                                "work");
-        if (!workDir.mkdirs()) {
-          if (!workDir.isDirectory()) {
-            throw new IOException("Mkdirs failed to create " + workDir.toString());
-          }
+        // job-specific shared directory for use as scratch space 
+        Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
+                       + Path.SEPARATOR + jobId 
+                       + Path.SEPARATOR + "work"), fConf);
+        if (!localFs.mkdirs(workDir)) {
+          throw new IOException("Mkdirs failed to create " 
+                      + workDir.toString());
         }
+        System.setProperty("job.local.dir", workDir.toString());
+        localJobConf.set("job.local.dir", workDir.toString());
         
-        // unjar the job.jar files in workdir
+        // copy Jar file to the local FS and unjar it.
         String jarFile = localJobConf.getJar();
         if (jarFile != null) {
-          localJarFile = new Path(jobDir,"job.jar");
+          // Here we check for and we check five times the size of jarFileSize
+          // to accommodate for unjarring the jar file in work directory 
+          localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
+                                     getJobCacheSubdir()
+                                     + Path.SEPARATOR + jobId 
+                                     + Path.SEPARATOR + "jars",
+                                     5 * jarFileSize, fConf), "job.jar");
+          if (!localFs.mkdirs(localJarFile.getParent())) {
+            throw new IOException("Mkdirs failed to create jars directory "); 
+          }
           fs.copyToLocalFile(new Path(jarFile), localJarFile);
           localJobConf.setJar(localJarFile.toString());
           OutputStream out = localFs.create(localJobFile);
@@ -687,8 +704,9 @@
           } finally {
             out.close();
           }
-
-          RunJar.unJar(new File(localJarFile.toString()), workDir);
+          // also unjar the job.jar files 
+          RunJar.unJar(new File(localJarFile.toString()),
+                       new File(localJarFile.getParent().toString()));
         }
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
@@ -763,7 +781,7 @@
     this.running = false;
         
     // Clear local storage
-    this.mapOutputFile.cleanupStorage();
+    cleanupStorage();
         
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
@@ -782,8 +800,6 @@
     maxCurrentReduceTasks = conf.getInt(
                   "mapred.tasktracker.reduce.tasks.maximum", 2);
     this.jobTrackAddr = JobTracker.getAddress(conf);
-    this.mapOutputFile = new MapOutputFile();
-    this.mapOutputFile.setConf(conf);
     String infoAddr = 
       NetUtils.getServerAddress(conf,
                                 "tasktracker.http.bindAddress", 
@@ -1370,7 +1386,11 @@
                     Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
                     task.getTaskId()), defaultJobConf );
       FileSystem localFs = FileSystem.getLocal(fConf);
-      
+      if (!localFs.mkdirs(localTaskDir)) {
+        throw new IOException("Mkdirs failed to create " 
+                    + localTaskDir.toString());
+      }
+
       // create symlink for ../work if it already doesnt exist
       String workDir = lDirAlloc.getLocalPathToRead(
                          TaskTracker.getJobCacheSubdir() 
@@ -1384,9 +1404,17 @@
         FileUtil.symLink(workDir, link);
       
       // create the working-directory of the task 
-      if (!localFs.mkdirs(localTaskDir)) {
-        throw new IOException("Mkdirs failed to create " + localTaskDir.toString());
+      Path cwd = lDirAlloc.getLocalPathForWrite(
+                         TaskTracker.getJobCacheSubdir() 
+                         + Path.SEPARATOR + task.getJobId() 
+                         + Path.SEPARATOR + task.getTaskId()
+                         + Path.SEPARATOR + "work",
+                         defaultJobConf);
+      if (!localFs.mkdirs(cwd)) {
+        throw new IOException("Mkdirs failed to create " 
+                    + cwd.toString());
       }
+
       Path localTaskFile = new Path(localTaskDir, "job.xml");
       task.setJobFile(localTaskFile.toString());
       localJobConf.set("mapred.local.dir",
@@ -1598,7 +1626,19 @@
               } catch(IOException e){
                 LOG.warn("Exception finding task's stdout/err/syslog files");
               }
-              File workDir = new File(task.getJobFile()).getParentFile();
+              File workDir = null;
+              try {
+                workDir = new File(lDirAlloc.getLocalPathToRead(
+                                     TaskTracker.getJobCacheSubdir() 
+                                     + Path.SEPARATOR + task.getJobId() 
+                                     + Path.SEPARATOR + task.getTaskId()
+                                     + Path.SEPARATOR + "work",
+                                     localJobConf). toString());
+              } catch (IOException e) {
+                LOG.warn("Working Directory of the task " + task.getTaskId() +
+                		 "doesnt exist. Throws expetion " +
+                          StringUtils.stringifyException(e));
+              }
               // Build the command  
               File stdout = TaskLog.getTaskLogFile(task.getTaskId(),
                                                    TaskLog.LogName.DEBUGOUT);
@@ -2216,6 +2256,12 @@
                       ) throws ServletException, IOException {
       String mapId = request.getParameter("map");
       String reduceId = request.getParameter("reduce");
+      String jobId = request.getParameter("job");
+
+      if (jobId == null) {
+        throw new IOException("job parameter is required");
+      }
+
       if (mapId == null || reduceId == null) {
         throw new IOException("map and reduce parameters are required");
       }
@@ -2241,11 +2287,15 @@
 
         // Index file
         Path indexFileName = lDirAlloc.getLocalPathToRead(
-            mapId+"/file.out.index", conf);
+            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
+            jobId + Path.SEPARATOR +
+            mapId + "/output" + "/file.out.index", conf);
         
         // Map-output file
         Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            mapId+"/file.out", conf);
+            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
+            jobId + Path.SEPARATOR +
+            mapId + "/output" + "/file.out", conf);
 
         /**
          * Read the index file to get the information about where

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Thu Mar 20 04:19:34 2008
@@ -256,12 +256,14 @@
     private JobConf conf;
     private boolean compressInput;
     private String taskId;
+    private String jobId;
     private boolean first = true;
       
     public void configure(JobConf conf) {
       this.conf = conf;
       compressInput = conf.getCompressMapOutput();
       taskId = conf.get("mapred.task.id");
+      jobId = conf.get("mapred.job.id");
     }
       
     public void reduce(WritableComparable key, Iterator values,
@@ -269,7 +271,9 @@
                        ) throws IOException {
       if (first) {
         first = false;
-        Path input = conf.getLocalPath(taskId+"/map_0.out");
+        MapOutputFile mapOutputFile = new MapOutputFile(jobId);
+        mapOutputFile.setConf(conf);
+        Path input = mapOutputFile.getInputFile(0, taskId);
         FileSystem fs = FileSystem.get(conf);
         assertTrue("reduce input exists " + input, fs.exists(input));
         SequenceFile.Reader rdr = 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Thu Mar 20 04:19:34 2008
@@ -144,12 +144,14 @@
         String name = contents[fileIdx];
         if (!("taskTracker".equals(contents[fileIdx]))) {
           LOG.debug("Looking at " + name);
-          int idx = neededDirs.indexOf(name);
           assertTrue("Spurious directory " + name + " found in " +
-                     localDir, idx != -1);
-          assertTrue("Matching output directory not found " + name +
-                     " in " + trackerDir, 
-                     new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory());
+                     localDir, false);
+        }
+      }
+      for (int idx = 0; idx < neededDirs.size(); ++idx) {
+        String name = neededDirs.get(idx);
+        if (new File(new File(new File(trackerDir, "jobcache"),
+                              jobIds[idx]), name).isDirectory()) {
           found[idx] = true;
           numNotDel++;
         }