You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/08/12 18:17:49 UTC

svn commit: r803583 [2/3] - in /hadoop/mapreduce/trunk: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apa...

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Aug 12 16:17:47 2009
@@ -22,12 +22,12 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
@@ -90,7 +90,7 @@
 
     private JobStatus status;
     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
-    private MapOutputFile mapoutputFile;
+
     private JobProfile profile;
     private Path localFile;
     private FileSystem localFs;
@@ -110,8 +110,6 @@
     public Job(JobID jobid, JobConf conf) throws IOException {
       this.file = new Path(getSystemDir(), jobid + "/job.xml");
       this.id = jobid;
-      this.mapoutputFile = new MapOutputFile(jobid);
-      this.mapoutputFile.setConf(conf);
 
       this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
@@ -168,7 +166,9 @@
         }
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
-        
+
+        Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
+            new HashMap<TaskAttemptID, MapOutputFile>();
         for (int i = 0; i < rawSplits.length; i++) {
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(
@@ -179,6 +179,12 @@
                                       rawSplits[i].getClassName(),
                                       rawSplits[i].getBytes(), 1);
             JobConf localConf = new JobConf(job);
+            TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+            MapOutputFile mapOutput = new MapOutputFile();
+            mapOutput.setConf(localConf);
+            mapOutputFiles.put(mapId, mapOutput);
+
             map.setJobFile(localFile.toString());
             map.localizeConfiguration(localConf);
             map.setConf(localConf);
@@ -196,14 +202,20 @@
           new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
         try {
           if (numReduceTasks > 0) {
+            ReduceTask reduce = new ReduceTask(file.toString(), 
+                reduceId, 0, mapIds.size(), 1);
+            JobConf localConf = new JobConf(job);
+            TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
               if (!this.isInterrupted()) {
                 TaskAttemptID mapId = mapIds.get(i);
-                Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-                Path reduceIn = this.mapoutputFile.getInputFileForWrite(
-                                  mapId.getTaskID(),reduceId,
-                                  localFs.getFileStatus(mapOut).getLen());
+                Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+                MapOutputFile localOutputFile = new MapOutputFile();
+                localOutputFile.setConf(localConf);
+                Path reduceIn =
+                  localOutputFile.getInputFileForWrite(mapId.getTaskID(),
+                        localFs.getFileStatus(mapOut).getLen());
                 if (!localFs.mkdirs(reduceIn.getParent())) {
                   throw new IOException("Mkdirs failed to create "
                       + reduceIn.getParent().toString());
@@ -215,9 +227,6 @@
               }
             }
             if (!this.isInterrupted()) {
-              ReduceTask reduce = new ReduceTask(file.toString(), 
-                                                 reduceId, 0, mapIds.size(), 1);
-              JobConf localConf = new JobConf(job);
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
@@ -232,11 +241,8 @@
             }
           }
         } finally {
-          for (TaskAttemptID mapId: mapIds) {
-            this.mapoutputFile.removeAll(mapId);
-          }
-          if (numReduceTasks == 1) {
-            this.mapoutputFile.removeAll(reduceId);
+          for (MapOutputFile output : mapOutputFiles.values()) {
+            output.removeAll();
           }
         }
         // delete the temporary directory in output directory

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Wed Aug 12 16:17:47 2009
@@ -30,144 +30,152 @@
 class MapOutputFile {
 
   private JobConf conf;
-  private JobID jobId;
-  
-  MapOutputFile() {
-  }
 
-  MapOutputFile(JobID jobId) {
-    this.jobId = jobId;
+  static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
+  MapOutputFile() {
   }
 
   private LocalDirAllocator lDirAlloc = 
                             new LocalDirAllocator("mapred.local.dir");
   
-  /** Return the path to local map output file created earlier
-   * @param mapTaskId a map task id
-   */
-  public Path getOutputFile(TaskAttemptID mapTaskId)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out", conf);
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out", conf);
   }
 
-  /** Create a local map output file name.
-   * @param mapTaskId a map task id
+  /**
+   * Create a local map output file name.
+   * 
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
-    throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out", size, conf);
-  }
-
-  /** Return the path to a local map output index file created earlier
-   * @param mapTaskId a map task id
-   */
-  public Path getOutputIndexFile(TaskAttemptID mapTaskId)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out.index", conf);
-  }
-
-  /** Create a local map output index file name.
-   * @param mapTaskId a map task id
+  public Path getOutputFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out", size, conf);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out.index", conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
-    throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out.index", 
-                       size, conf);
+  public Path getOutputIndexFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out.index", size, conf);
   }
 
-  /** Return a local map spill file created earlier.
-   * @param mapTaskId a map task id
+  /**
+   * Return a local map spill file created earlier.
+   * 
    * @param spillNumber the number
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" 
-                       + spillNumber + ".out", conf);
+  public Path getSpillFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out", conf);
   }
 
-  /** Create a local map spill file name.
-   * @param mapTaskId a map task id
+  /**
+   * Create a local map spill file name.
+   * 
    * @param spillNumber the number
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, 
-         long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" + 
-                       spillNumber + ".out", size, conf);
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out", size, conf);
   }
 
-  /** Return a local map spill index file created earlier
-   * @param mapTaskId a map task id
+  /**
+   * Return a local map spill index file created earlier
+   * 
    * @param spillNumber the number
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" + 
-                       spillNumber + ".out.index", conf);
+  public Path getSpillIndexFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out.index", conf);
   }
 
-  /** Create a local map spill index file name.
-   * @param mapTaskId a map task id
+  /**
+   * Create a local map spill index file name.
+   * 
    * @param spillNumber the number
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
-         long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" + spillNumber + 
-                       ".out.index", size, conf);
-  }
-
-  /** Return a local reduce input file created earlier
-   * @param mapTaskId a map task id
-   * @param reduceTaskId a reduce task id
-   */
-  public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
-    throws IOException {
-    // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), reduceTaskId.toString())
-                       + "/map_" + mapId + ".out",
-                       conf);
-  }
-
-  /** Create a local reduce input file name.
-   * @param mapTaskId a map task id
-   * @param reduceTaskId a reduce task id
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out.index", size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
+            .valueOf(mapId)), conf);
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, 
-                                   long size)
-    throws IOException {
-    // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), reduceTaskId.toString())
-                       + "/map_" + mapId.getId() + ".out", 
-                       size, conf);
+  public Path getInputFileForWrite(TaskID mapId, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
+        size, conf);
   }
 
   /** Removes all of the files related to a task. */
-  public void removeAll(TaskAttemptID taskId) throws IOException {
-    conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
-                          jobId.toString(), taskId.toString())
-);
+  public void removeAll()
+      throws IOException {
+    conf.deleteLocalFiles(TaskTracker.OUTPUT);
   }
 
   public void setConf(Configuration conf) {
@@ -177,9 +185,4 @@
       this.conf = new JobConf(conf);
     }
   }
-  
-  public void setJobId(JobID jobId) {
-    this.jobId = jobId;
-  }
-
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Aug 12 16:17:47 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -66,7 +67,6 @@
    * The size of each record in the index file for the map-outputs.
    */
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-  
 
   private BytesWritable split = new BytesWritable();
   private String splitClass;
@@ -101,11 +101,21 @@
   }
 
   @Override
-  public void localizeConfiguration(JobConf conf) throws IOException {
+  public void localizeConfiguration(JobConf conf)
+      throws IOException {
     super.localizeConfiguration(conf);
-    if (isMapOrReduce()) {
-      Path localSplit = new Path(new Path(getJobFile()).getParent(), 
-                                 "split.dta");
+    // split.dta file is used only by IsolationRunner.
+    // Write the split file to the local disk if it is a normal map task (not a
+    // job-setup or a job-cleanup task) and if the user wishes to run
+    // IsolationRunner either by setting keep.failed.tasks.files to true or by
+    // using keep.tasks.files.pattern
+    if (isMapOrReduce()
+        && (conf.getKeepTaskFilesPattern() != null || conf
+            .getKeepFailedTaskFiles())) {
+      Path localSplit =
+          new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
+              TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
+                  .toString()), conf);
       LOG.debug("Writing local split to " + localSplit);
       DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
       Text.writeString(out, splitClass);
@@ -1220,8 +1230,8 @@
       try {
         // create spill file
         final SpillRecord spillRec = new SpillRecord(partitions);
-        final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
-            numSpills, size);
+        final Path filename =
+            mapOutputFile.getSpillFileForWrite(numSpills, size);
         out = rfs.create(filename);
 
         final int endPosition = (kvend > kvstart)
@@ -1285,9 +1295,9 @@
 
         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
           // create spill index file
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-              getTaskID(), numSpills,
-              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          Path indexFilename =
+              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
           spillRec.writeToFile(indexFilename, job);
         } else {
           indexCacheList.add(spillRec);
@@ -1313,8 +1323,8 @@
       try {
         // create spill file
         final SpillRecord spillRec = new SpillRecord(partitions);
-        final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
-            numSpills, size);
+        final Path filename =
+            mapOutputFile.getSpillFileForWrite(numSpills, size);
         out = rfs.create(filename);
         
         // we don't run the combiner for a single record
@@ -1350,9 +1360,9 @@
         }
         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
           // create spill index file
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-              getTaskID(), numSpills,
-              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          Path indexFilename =
+              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
           spillRec.writeToFile(indexFilename, job);
         } else {
           indexCacheList.add(spillRec);
@@ -1442,14 +1452,14 @@
       final TaskAttemptID mapId = getTaskID();
 
       for(int i = 0; i < numSpills; i++) {
-        filename[i] = mapOutputFile.getSpillFile(mapId, i);
+        filename[i] = mapOutputFile.getSpillFile(i);
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       if (numSpills == 1) { //the spill is the final output
         rfs.rename(filename[0],
             new Path(filename[0].getParent(), "file.out"));
         if (indexCacheList.size() == 0) {
-          rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
+          rfs.rename(mapOutputFile.getSpillIndexFile(0),
               new Path(filename[0].getParent(),"file.out.index"));
         } else {
           indexCacheList.get(0).writeToFile(
@@ -1460,7 +1470,7 @@
 
       // read in paged indices
       for (int i = indexCacheList.size(); i < numSpills; ++i) {
-        Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
         indexCacheList.add(new SpillRecord(indexFileName, job));
       }
 
@@ -1468,10 +1478,10 @@
       //lengths for each partition
       finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
       finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-      Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
-                             finalOutFileSize);
-      Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
-                            mapId, finalIndexFileSize);
+      Path finalOutputFile =
+          mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+      Path finalIndexFile =
+          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
 
       //The output stream for the final single output file
       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Wed Aug 12 16:17:47 2009
@@ -34,13 +34,13 @@
       return false;
     }
     
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
     return true;
   }
 
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
     LOG.info(getTask()+" done; removing files.");
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Aug 12 16:17:47 2009
@@ -208,7 +208,7 @@
     if (isLocal) {
       // for local jobs
       for(int i = 0; i < numMaps; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
+        fileList.add(mapOutputFile.getInputFile(i));
       }
     } else {
       // for non local jobs
@@ -1283,12 +1283,11 @@
         // else, we will check the localFS to find a suitable final location
         // for this path
         TaskAttemptID reduceId = reduceTask.getTaskID();
-        Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
-                                 reduceId.getJobID().toString(),
-                                 reduceId.toString()) 
-                                 + "/map_" +
-                                 loc.getTaskId().getId() + ".out");
-        
+        Path filename =
+            new Path(String.format(
+                MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
+                TaskTracker.OUTPUT, loc.getTaskId().getId()));
+
         // Copy the map output to a temp file whose name is unique to this attempt 
         Path tmpMapOutput = new Path(filename+"-"+id);
         
@@ -2325,8 +2324,8 @@
           sortPhaseFinished = true;
           
           // must spill to disk, but can't retain in-mem for intermediate merge
-          final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
-                            reduceTask.getTaskID(), inMemToDiskBytes);
+          final Path outputPath =
+              mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
           final RawKeyValueIterator rIter = Merger.merge(job, fs,
               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
               tmpDir, comparator, reporter, spilledRecordsCounter, null,
@@ -2620,8 +2619,8 @@
         long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
         int noInMemorySegments = inMemorySegments.size();
 
-        Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
-                          reduceTask.getTaskID(), mergeOutputSize);
+        Path outputPath =
+            mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
 
         Writer writer = 
           new Writer(conf, rfs, outputPath,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed Aug 12 16:17:47 2009
@@ -37,7 +37,7 @@
     }
     
     // cleanup from failures
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
     return true;
   }
   
@@ -46,6 +46,6 @@
   public void close() throws IOException {
     LOG.info(getTask()+" done; removing files.");
     getTask().getProgress().setStatus("closed");
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Aug 12 16:17:47 2009
@@ -150,7 +150,6 @@
                                                     TaskStatus.Phase.MAP : 
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
-    this.mapOutputFile.setJobId(taskId.getJobID());
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
   }
 
@@ -309,7 +308,6 @@
     partition = in.readInt();
     numSlotsRequired = in.readInt();
     taskStatus.readFields(in);
-    this.mapOutputFile.setJobId(taskId.getJobID()); 
     skipRanges.readFields(in);
     currentRecIndexIterator = skipRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Wed Aug 12 16:17:47 2009
@@ -17,13 +17,17 @@
 */
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -45,27 +49,95 @@
   public Configuration getConf() {
     return conf;
   }
-  
+
+  // The list of directory paths specified in the variable mapred.local.dir.
+  // This is used to determine which among the list of directories is picked up
+  // for storing data for a particular task.
+  protected String[] mapredLocalDirs;
+
   public void setConf(Configuration conf) {
     this.conf = conf;
+    mapredLocalDirs = conf.getStrings("mapred.local.dir");
   }
-  
+
+  /**
+   * Sets up the permissions of the following directories on all the configured
+   * disks:
+   * <ul>
+   * <li>mapred-local directories</li>
+   * <li>Job cache directories</li>
+   * <li>Archive directories</li>
+   * <li>Hadoop log directories</li>
+   * </ul>
+   */
+  void setup() {
+    for (String localDir : this.mapredLocalDirs) {
+      // Set up the mapred-local directories.
+      File mapredlocalDir = new File(localDir);
+      if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
+        LOG.warn("Unable to create mapred-local directory : "
+            + mapredlocalDir.getPath());
+      } else {
+        PermissionsHandler.setPermissions(mapredlocalDir,
+            PermissionsHandler.sevenFiveFive);
+      }
+
+      // Set up the cache directory used for distributed cache files
+      File distributedCacheDir =
+          new File(localDir, TaskTracker.getDistributedCacheDir());
+      if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
+        LOG.warn("Unable to create cache directory : "
+            + distributedCacheDir.getPath());
+      } else {
+        PermissionsHandler.setPermissions(distributedCacheDir,
+            PermissionsHandler.sevenFiveFive);
+      }
+
+      // Set up the jobcache directory
+      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+      if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
+        LOG.warn("Unable to create job cache directory : "
+            + jobCacheDir.getPath());
+      } else {
+        PermissionsHandler.setPermissions(jobCacheDir,
+            PermissionsHandler.sevenFiveFive);
+      }
+    }
+
+    // Set up the user log directory
+    File taskLog = TaskLog.getUserLogDir();
+    if (!taskLog.exists() && !taskLog.mkdirs()) {
+      LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
+    } else {
+      PermissionsHandler.setPermissions(taskLog,
+          PermissionsHandler.sevenFiveFive);
+    }
+  }
+
   /**
-   * Setup task controller component.
+   * Take task-controller specific actions to initialize job. This involves
+   * setting appropriate permissions to job-files so as to secure the files to
+   * be accessible only by the user's tasks.
    * 
+   * @throws IOException
    */
-  abstract void setup();
-  
-  
+  abstract void initializeJob(JobInitializationContext context) throws IOException;
+
   /**
    * Launch a task JVM
    * 
-   * This method defines how a JVM will be launched to run a task.
+   * This method defines how a JVM will be launched to run a task. Each
+   * task-controller should also do an
+   * {@link #initializeTask(TaskControllerContext)} inside this method so as to
+   * initialize the task before launching it. This is for reasons of
+   * task-controller specific optimizations w.r.t combining initialization and
+   * launching of tasks.
+   * 
    * @param context the context associated to the task
    */
   abstract void launchTaskJVM(TaskControllerContext context)
                                       throws IOException;
-  
+
   /**
    * Top level cleanup a task JVM method.
    *
@@ -90,47 +162,44 @@
     }
     killTask(context);
   }
-  
-  /**
-   * Perform initializing actions required before a task can run.
-   * 
-   * For instance, this method can be used to setup appropriate
-   * access permissions for files and directories that will be
-   * used by tasks. Tasks use the job cache, log, PID and distributed cache
-   * directories and files as part of their functioning. Typically,
-   * these files are shared between the daemon and the tasks
-   * themselves. So, a TaskController that is launching tasks
-   * as different users can implement this method to setup
-   * appropriate ownership and permissions for these directories
-   * and files.
-   */
-  abstract void initializeTask(TaskControllerContext context);
-  
-  
+
+  /** Perform initializing actions required before a task can run.
+    * 
+    * For instance, this method can be used to setup appropriate
+    * access permissions for files and directories that will be
+    * used by tasks. Tasks use the job cache, log, and distributed cache
+    * directories and files as part of their functioning. Typically,
+    * these files are shared between the daemon and the tasks
+    * themselves. So, a TaskController that is launching tasks
+    * as different users can implement this method to setup
+    * appropriate ownership and permissions for these directories
+    * and files.
+    */
+  abstract void initializeTask(TaskControllerContext context)
+      throws IOException;
+
   /**
    * Contains task information required for the task controller.  
    */
   static class TaskControllerContext {
     // task being executed
-    Task task; 
-    // the JVM environment for the task
-    JvmEnv env;
-    // the Shell executor executing the JVM for this task
-    ShellCommandExecutor shExec; 
-    // process handle of task JVM
-    String pid;
-    // waiting time before sending SIGKILL to task JVM after sending SIGTERM
-    long sleeptimeBeforeSigkill;
+    Task task;
+    ShellCommandExecutor shExec;     // the Shell executor executing the JVM for this task.
+
+    // Information used only when this context is used for launching new tasks.
+    JvmEnv env;     // the JVM environment for the task.
+
+    // Information used only when this context is used for destroying a task jvm.
+    String pid; // process handle of task JVM.
+    long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+  }
+
+  static class JobInitializationContext {
+    JobID jobid;
+    File workDir;
+    String user;
   }
 
-  /**
-   * Method which is called after the job is localized so that task controllers
-   * can implement their own job localization logic.
-   * 
-   * @param tip  Task of job for which localization happens.
-   */
-  abstract void initializeJob(JobID jobId);
-  
   /**
    * Sends a graceful terminate signal to taskJVM and it sub-processes. 
    *   
@@ -144,6 +213,5 @@
    * 
    * @param context task context
    */
-  
   abstract void killTask(TaskControllerContext context);
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Wed Aug 12 16:17:47 2009
@@ -54,9 +54,10 @@
   private static final Log LOG =
     LogFactory.getLog(TaskLog.class);
 
+  static final String USERLOGS_DIR_NAME = "userlogs";
+
   private static final File LOG_DIR = 
-    new File(System.getProperty("hadoop.log.dir"), 
-             "userlogs").getAbsoluteFile();
+    new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
   
   static LocalFileSystem localFS = null;
   static {
@@ -156,8 +157,12 @@
       return new File(getBaseDir(taskid), "log.index");
     }
   }
-  
-  private static File getBaseDir(String taskid) {
+
+  static String getBaseLogDir() {
+    return System.getProperty("hadoop.log.dir");
+  }
+
+  static File getBaseDir(String taskid) {
     return new File(LOG_DIR, taskid);
   }
   private static long prevOutLength;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Aug 12 16:17:47 2009
@@ -39,6 +39,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
@@ -77,7 +78,7 @@
     this.t = tip.getTask();
     this.tracker = tracker;
     this.conf = conf;
-    this.mapOutputFile = new MapOutputFile(t.getJobID());
+    this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
     this.jvmManager = tracker.getJvmManagerInstance();
   }
@@ -121,213 +122,45 @@
       TaskAttemptID taskid = t.getTaskID();
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-      
+
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
+      // We don't create any symlinks yet, so presence/absence of workDir
+      // actually on the file system doesn't matter.
       setupDistributedCache(lDirAlloc, workDir, archives, files);
-          
+
+      // Set up the child task's configuration. After this call, no localization
+      // of files should happen in the TaskTracker's process space. Any changes to
+      // the conf object after this will NOT be reflected to the child.
+      setupChildTaskConfiguration(lDirAlloc);
+
       if (!prepare()) {
         return;
       }
 
-      // Accumulates class paths for child.
-      List<String> classPaths = new ArrayList<String>();
-      // start with same classpath as parent process
-      appendSystemClasspaths(classPaths);
-
-      if (!workDir.mkdirs()) {
-        if (!workDir.isDirectory()) {
-          LOG.fatal("Mkdirs failed to create " + workDir.toString());
-        }
-      }
-
-      // include the user specified classpath
-      appendJobJarClasspaths(conf.getJar(), classPaths);
-  		
-      // Distributed cache paths
-      appendDistributedCacheClasspaths(conf, archives, files, classPaths);
-      
-      // Include the working dir too
-      classPaths.add(workDir.toString());
-      
       // Build classpath
-      
-      
-      //  Build exec child JVM args.
-      Vector<String> vargs = new Vector<String>(8);
-      File jvm =                                  // use same jvm as parent
-        new File(new File(System.getProperty("java.home"), "bin"), "java");
-
-      vargs.add(jvm.toString());
-
-      // Add child (task) java-vm options.
-      //
-      // The following symbols if present in mapred.child.java.opts value are
-      // replaced:
-      // + @taskid@ is interpolated with value of TaskID.
-      // Other occurrences of @ will not be altered.
-      //
-      // Example with multiple arguments and substitutions, showing
-      // jvm GC logging, and start of a passwordless JVM JMX agent so can
-      // connect with jconsole and the likes to watch child memory, threads
-      // and get thread dumps.
-      //
-      //  <property>
-      //    <name>mapred.child.java.opts</name>
-      //    <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
-      //           -Dcom.sun.management.jmxremote.authenticate=false \
-      //           -Dcom.sun.management.jmxremote.ssl=false \
-      //    </value>
-      //  </property>
-      //
-      String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
-      javaOpts = javaOpts.replace("@taskid@", taskid.toString());
-      String [] javaOptsSplit = javaOpts.split(" ");
-      
-      // Add java.library.path; necessary for loading native libraries.
-      //
-      // 1. To support native-hadoop library i.e. libhadoop.so, we add the 
-      //    parent processes' java.library.path to the child. 
-      // 2. We also add the 'cwd' of the task to it's java.library.path to help 
-      //    users distribute native libraries via the DistributedCache.
-      // 3. The user can also specify extra paths to be added to the 
-      //    java.library.path via mapred.child.java.opts.
-      //
-      String libraryPath = System.getProperty("java.library.path");
-      if (libraryPath == null) {
-        libraryPath = workDir.getAbsolutePath();
-      } else {
-        libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
-      }
-      boolean hasUserLDPath = false;
-      for(int i=0; i<javaOptsSplit.length ;i++) { 
-        if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
-          javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
-          hasUserLDPath = true;
-          break;
-        }
-      }
-      if(!hasUserLDPath) {
-        vargs.add("-Djava.library.path=" + libraryPath);
-      }
-      for (int i = 0; i < javaOptsSplit.length; i++) {
-        vargs.add(javaOptsSplit[i]);
-      }
-
-      // add java.io.tmpdir given by mapred.child.tmp
-      String tmp = conf.get("mapred.child.tmp", "./tmp");
-      Path tmpDir = new Path(tmp);
-      
-      // if temp directory path is not absolute 
-      // prepend it with workDir.
-      if (!tmpDir.isAbsolute()) {
-        tmpDir = new Path(workDir.toString(), tmp);
-      }
-      FileSystem localFs = FileSystem.getLocal(conf);
-      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
-      }
-      vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
-
-      // Add classpath.
-      vargs.add("-classpath");
-      String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
-      vargs.add(classPath);
+      List<String> classPaths = getClassPaths(conf, workDir, archives, files);
 
-      // Setup the log4j prop
       long logSize = TaskLog.getTaskLogLength(conf);
-      vargs.add("-Dhadoop.log.dir=" + 
-          new File(System.getProperty("hadoop.log.dir")
-          ).getAbsolutePath());
-      vargs.add("-Dhadoop.root.logger=INFO,TLA");
-      vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
-      vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
-
-      if (conf.getProfileEnabled()) {
-        if (conf.getProfileTaskRange(t.isMapTask()
-                                     ).isIncluded(t.getPartition())) {
-          File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
-          vargs.add(String.format(conf.getProfileParams(), prof.toString()));
-        }
-      }
 
-      // Add main class and its arguments 
-      vargs.add(Child.class.getName());  // main of Child
-      // pass umbilical address
-      InetSocketAddress address = tracker.getTaskTrackerReportAddress();
-      vargs.add(address.getAddress().getHostAddress()); 
-      vargs.add(Integer.toString(address.getPort())); 
-      vargs.add(taskid.toString());                      // pass task identifier
+      //  Build exec child JVM args.
+      Vector<String> vargs =
+          getVMArgs(taskid, workDir, classPaths, logSize);
 
       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
-      String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
-      List<String> setup = null;
-      if (ulimitCmd != null) {
-        setup = new ArrayList<String>();
-        for (String arg : ulimitCmd) {
-          setup.add(arg);
-        }
-      }
+      List<String> setup = getVMSetupCmd();
 
       // Set up the redirection of the task's stdout and stderr streams
-      File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
-      File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
-      boolean b = stdout.getParentFile().mkdirs();
-      if (!b) {
-        LOG.warn("mkdirs failed. Ignoring");
-      }
-      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
+      File[] logFiles = prepareLogFiles(taskid);
+      File stdout = logFiles[0];
+      File stderr = logFiles[1];
+      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
+          stderr);
 
       Map<String, String> env = new HashMap<String, String>();
-      StringBuffer ldLibraryPath = new StringBuffer();
-      ldLibraryPath.append(workDir.toString());
-      String oldLdLibraryPath = null;
-      oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
-      if (oldLdLibraryPath != null) {
-        ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
-        ldLibraryPath.append(oldLdLibraryPath);
-      }
-      env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-      
-      // add the env variables passed by the user
-      String mapredChildEnv = conf.get("mapred.child.env");
-      if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
-        String childEnvs[] = mapredChildEnv.split(",");
-        for (String cEnv : childEnvs) {
-          try {
-            String[] parts = cEnv.split("="); // split on '='
-            String value = env.get(parts[0]);
-            if (value != null) {
-              // replace $env with the child's env constructed by tt's
-              // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
-              value = parts[1].replace("$" + parts[0], value);
-            } else {
-              // this key is not configured by the tt for the child .. get it 
-              // from the tt's env
-              // example PATH=$PATH:/tmp
-              value = System.getenv(parts[0]);
-              if (value != null) {
-                // the env key is present in the tt's env
-                value = parts[1].replace("$" + parts[0], value);
-              } else {
-                // the env key is note present anywhere .. simply set it
-                // example X=$X:/tmp or X=/tmp
-                value = parts[1].replace("$" + parts[0], "");
-              }
-            }
-            env.put(parts[0], value);
-          } catch (Throwable t) {
-            // set the error msg
-            errorInfo = "Invalid User environment settings : " + mapredChildEnv 
-                        + ". Failed to parse user-passed environment param."
-                        + " Expecting : env1=value1,env2=value2...";
-            LOG.warn(errorInfo);
-            throw t;
-          }
-        }
-      }
+      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
 
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
@@ -355,7 +188,7 @@
         LOG.fatal(t.getTaskID()+" reporting FSError", ie);
       }
     } catch (Throwable throwable) {
-      LOG.warn(t.getTaskID() + errorInfo, throwable);
+      LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
       Throwable causeThrowable = new Throwable(errorInfo, throwable);
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       causeThrowable.printStackTrace(new PrintStream(baos));
@@ -385,15 +218,327 @@
     }
   }
 
+  /**
+   * Prepare the log files for the task
+   * 
+   * @param taskid
+   * @return an array of files. The first file is stdout, the second is stderr.
+   */
+  static File[] prepareLogFiles(TaskAttemptID taskid) {
+    File[] logFiles = new File[2];
+    logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+    logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    File logDir = logFiles[0].getParentFile();
+    boolean b = logDir.mkdirs();
+    if (!b) {
+      LOG.warn("mkdirs failed. Ignoring");
+    } else {
+      PermissionsHandler.setPermissions(logDir,
+          PermissionsHandler.sevenZeroZero);
+    }
+    return logFiles;
+  }
+
+  /**
+   * Write the child's configuration to the disk and set it in configuration so
+   * that the child can pick it up from there.
+   * 
+   * @param lDirAlloc
+   * @throws IOException
+   */
+  void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc)
+      throws IOException {
+
+    Path localTaskFile =
+        lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
+            .getJobID().toString(), t.getTaskID().toString(), t
+            .isTaskCleanupTask()), conf);
+
+    // write the child's task configuration file to the local disk
+    writeLocalTaskFile(localTaskFile.toString(), conf);
+
+    // Set the final job file in the task. The child needs to know the correct
+    // path to job.xml. So set this path accordingly.
+    t.setJobFile(localTaskFile.toString());
+  }
+
+  /**
+   * @return
+   */
+  private List<String> getVMSetupCmd() {
+    String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+    List<String> setup = null;
+    if (ulimitCmd != null) {
+      setup = new ArrayList<String>();
+      for (String arg : ulimitCmd) {
+        setup.add(arg);
+      }
+    }
+    return setup;
+  }
+
+  /**
+   * @param taskid
+   * @param workDir
+   * @param classPaths
+   * @param logSize
+   * @return
+   * @throws IOException
+   */
+  private Vector<String> getVMArgs(TaskAttemptID taskid, File workDir,
+      List<String> classPaths, long logSize)
+      throws IOException {
+    Vector<String> vargs = new Vector<String>(8);
+    File jvm =                                  // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+
+    vargs.add(jvm.toString());
+
+    // Add child (task) java-vm options.
+    //
+    // The following symbols if present in mapred.child.java.opts value are
+    // replaced:
+    // + @taskid@ is interpolated with value of TaskID.
+    // Other occurrences of @ will not be altered.
+    //
+    // Example with multiple arguments and substitutions, showing
+    // jvm GC logging, and start of a passwordless JVM JMX agent so can
+    // connect with jconsole and the likes to watch child memory, threads
+    // and get thread dumps.
+    //
+    //  <property>
+    //    <name>mapred.child.java.opts</name>
+    //    <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
+    //           -Dcom.sun.management.jmxremote.authenticate=false \
+    //           -Dcom.sun.management.jmxremote.ssl=false \
+    //    </value>
+    //  </property>
+    //
+    String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
+    javaOpts = javaOpts.replace("@taskid@", taskid.toString());
+    String [] javaOptsSplit = javaOpts.split(" ");
+    
+    // Add java.library.path; necessary for loading native libraries.
+    //
+    // 1. To support native-hadoop library i.e. libhadoop.so, we add the 
+    //    parent processes' java.library.path to the child. 
+    // 2. We also add the 'cwd' of the task to it's java.library.path to help 
+    //    users distribute native libraries via the DistributedCache.
+    // 3. The user can also specify extra paths to be added to the 
+    //    java.library.path via mapred.child.java.opts.
+    //
+    String libraryPath = System.getProperty("java.library.path");
+    if (libraryPath == null) {
+      libraryPath = workDir.getAbsolutePath();
+    } else {
+      libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
+    }
+    boolean hasUserLDPath = false;
+    for(int i=0; i<javaOptsSplit.length ;i++) { 
+      if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
+        javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
+        hasUserLDPath = true;
+        break;
+      }
+    }
+    if(!hasUserLDPath) {
+      vargs.add("-Djava.library.path=" + libraryPath);
+    }
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+
+    Path childTmpDir = createChildTmpDir(workDir, conf);
+    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+    // Add classpath.
+    vargs.add("-classpath");
+    String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+    vargs.add(classPath);
+
+    // Setup the log4j prop
+    vargs.add("-Dhadoop.log.dir=" + 
+        new File(System.getProperty("hadoop.log.dir")
+        ).getAbsolutePath());
+    vargs.add("-Dhadoop.root.logger=INFO,TLA");
+    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+
+    if (conf.getProfileEnabled()) {
+      if (conf.getProfileTaskRange(t.isMapTask()
+                                   ).isIncluded(t.getPartition())) {
+        File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+        vargs.add(String.format(conf.getProfileParams(), prof.toString()));
+      }
+    }
+
+    // Add main class and its arguments 
+    vargs.add(Child.class.getName());  // main of Child
+    // pass umbilical address
+    InetSocketAddress address = tracker.getTaskTrackerReportAddress();
+    vargs.add(address.getAddress().getHostAddress()); 
+    vargs.add(Integer.toString(address.getPort())); 
+    vargs.add(taskid.toString());                      // pass task identifier
+    return vargs;
+  }
+
+  /**
+   * @param taskid
+   * @param workDir
+   * @return
+   * @throws IOException
+   */
+  static Path createChildTmpDir(File workDir,
+      JobConf conf)
+      throws IOException {
+
+    // add java.io.tmpdir given by mapred.child.tmp
+    String tmp = conf.get("mapred.child.tmp", "./tmp");
+    Path tmpDir = new Path(tmp);
+
+    // if temp directory path is not absolute, prepend it with workDir.
+    if (!tmpDir.isAbsolute()) {
+      tmpDir = new Path(workDir.toString(), tmp);
+
+      FileSystem localFs = FileSystem.getLocal(conf);
+      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
+        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+      }
+    }
+    return tmpDir;
+  }
+
+  /**
+   */
+  private static List<String> getClassPaths(JobConf conf, File workDir,
+      URI[] archives, URI[] files)
+      throws IOException {
+    // Accumulates class paths for child.
+    List<String> classPaths = new ArrayList<String>();
+    // start with same classpath as parent process
+    appendSystemClasspaths(classPaths);
+
+    // include the user specified classpath
+    appendJobJarClasspaths(conf.getJar(), classPaths);
+    
+    // Distributed cache paths
+    appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+    
+    // Include the working dir too
+    classPaths.add(workDir.toString());
+    return classPaths;
+  }
+
+  /**
+   * @param errorInfo
+   * @param workDir
+   * @param env
+   * @return
+   * @throws Throwable
+   */
+  private static String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
+      Map<String, String> env)
+      throws Throwable {
+    StringBuffer ldLibraryPath = new StringBuffer();
+    ldLibraryPath.append(workDir.toString());
+    String oldLdLibraryPath = null;
+    oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
+    if (oldLdLibraryPath != null) {
+      ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+      ldLibraryPath.append(oldLdLibraryPath);
+    }
+    env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+    
+    // add the env variables passed by the user
+    String mapredChildEnv = conf.get("mapred.child.env");
+    if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+      String childEnvs[] = mapredChildEnv.split(",");
+      for (String cEnv : childEnvs) {
+        try {
+          String[] parts = cEnv.split("="); // split on '='
+          String value = env.get(parts[0]);
+          if (value != null) {
+            // replace $env with the child's env constructed by tt's
+            // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+            value = parts[1].replace("$" + parts[0], value);
+          } else {
+            // this key is not configured by the tt for the child .. get it 
+            // from the tt's env
+            // example PATH=$PATH:/tmp
+            value = System.getenv(parts[0]);
+            if (value != null) {
+              // the env key is present in the tt's env
+              value = parts[1].replace("$" + parts[0], value);
+            } else {
+              // the env key is note present anywhere .. simply set it
+              // example X=$X:/tmp or X=/tmp
+              value = parts[1].replace("$" + parts[0], "");
+            }
+          }
+          env.put(parts[0], value);
+        } catch (Throwable t) {
+          // set the error msg
+          errorInfo = "Invalid User environment settings : " + mapredChildEnv 
+                      + ". Failed to parse user-passed environment param."
+                      + " Expecting : env1=value1,env2=value2...";
+          LOG.warn(errorInfo);
+          throw t;
+        }
+      }
+    }
+    return errorInfo;
+  }
+
+  /**
+   * Write the task specific job-configuration file.
+   * 
+   * @param localFs
+   * @throws IOException
+   */
+  private static void writeLocalTaskFile(String jobFile, JobConf conf)
+      throws IOException {
+    Path localTaskFile = new Path(jobFile);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(localTaskFile, true);
+    OutputStream out = localFs.create(localTaskFile);
+    try {
+      conf.writeXml(out);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+   * Whenever it uses LocalDirAllocator from now on inside the child, it will
+   * only see files inside the attempt-directory. This is done in the Child's
+   * process space.
+   */
+  static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+    String[] localDirs = conf.getStrings("mapred.local.dir");
+    String jobId = t.getJobID().toString();
+    String taskId = t.getTaskID().toString();
+    boolean isCleanup = t.isTaskCleanupTask();
+    StringBuffer childMapredLocalDir =
+        new StringBuffer(localDirs[0] + Path.SEPARATOR
+            + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+    for (int i = 1; i < localDirs.length; i++) {
+      childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+          + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+    }
+    LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
+    conf.set("mapred.local.dir", childMapredLocalDir.toString());
+  }
+
   /** Creates the working directory pathname for a task attempt. */ 
   static File formWorkDir(LocalDirAllocator lDirAlloc, 
       TaskAttemptID task, boolean isCleanup, JobConf conf) 
       throws IOException {
-    File workDir = new File(lDirAlloc.getLocalPathToRead(
-        TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
-          task.toString(), isCleanup) 
-        + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
-    return workDir;
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+            .getJobID().toString(), task.toString(), isCleanup), conf);
+
+    return new File(workDir.toString());
   }
 
   private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
@@ -412,7 +557,7 @@
           fileStatus = fileSystem.getFileStatus(
                                     new Path(archives[i].getPath()));
           String cacheId = DistributedCache.makeRelative(archives[i],conf);
-          String cachePath = TaskTracker.getCacheSubdir() + 
+          String cachePath = TaskTracker.getDistributedCacheDir() + 
                                Path.SEPARATOR + cacheId;
           
           localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -438,7 +583,7 @@
           fileStatus = fileSystem.getFileStatus(
                                     new Path(files[i].getPath()));
           String cacheId = DistributedCache.makeRelative(files[i], conf);
-          String cachePath = TaskTracker.getCacheSubdir() +
+          String cachePath = TaskTracker.getDistributedCacheDir() +
                                Path.SEPARATOR + cacheId;
           
           localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -455,20 +600,12 @@
         }
         DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
       }
-      Path localTaskFile = new Path(t.getJobFile());
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(localTaskFile, true);
-      OutputStream out = localFs.create(localTaskFile);
-      try {
-        conf.writeXml(out);
-      } finally {
-        out.close();
-      }
     }
   }
 
-  private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives, 
-      URI[] files, List<String> classPaths) throws IOException {
+  private static void appendDistributedCacheClasspaths(JobConf conf,
+      URI[] archives, URI[] files, List<String> classPaths)
+      throws IOException {
     // Archive paths
     Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
     if (archiveClasspaths != null && archives != null) {
@@ -503,8 +640,9 @@
     }
   }
 
-  private void appendSystemClasspaths(List<String> classPaths) {
-    for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+  private static void appendSystemClasspaths(List<String> classPaths) {
+    for (String c : System.getProperty("java.class.path").split(
+        SYSTEM_PATH_SEPARATOR)) {
       classPaths.add(c);
     }
   }
@@ -586,19 +724,8 @@
       // Do not exit even if symlinks have not been created.
       LOG.warn(StringUtils.stringifyException(ie));
     }
-    // add java.io.tmpdir given by mapred.child.tmp
-    String tmp = conf.get("mapred.child.tmp", "./tmp");
-    Path tmpDir = new Path(tmp);
 
-    // if temp directory path is not absolute
-    // prepend it with workDir.
-    if (!tmpDir.isAbsolute()) {
-      tmpDir = new Path(workDir.toString(), tmp);
-      FileSystem localFs = FileSystem.getLocal(conf);
-      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
-      }
-    }
+    createChildTmpDir(workDir, conf);
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Aug 12 16:17:47 2009
@@ -65,6 +65,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
@@ -197,10 +198,16 @@
   //for serving map output to the other nodes
 
   static Random r = new Random();
-  private static final String SUBDIR = "taskTracker";
-  private static final String CACHEDIR = "archive";
-  private static final String JOBCACHE = "jobcache";
-  private static final String OUTPUT = "output";
+  static final String SUBDIR = "taskTracker";
+  private static final String DISTCACHEDIR = "distcache";
+  static final String JOBCACHE = "jobcache";
+  static final String OUTPUT = "output";
+  private static final String JARSDIR = "jars";
+  static final String LOCAL_SPLIT_FILE = "split.dta";
+  static final String JOBFILE = "job.xml";
+
+  static final String JOB_LOCAL_DIR = "job.local.dir";
+
   private JobConf fConf;
   private FileSystem localFs;
   private int maxMapSlots;
@@ -388,25 +395,52 @@
     }
   }
 
-  static String getCacheSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+  static String getDistributedCacheDir() {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
   }
 
   static String getJobCacheSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
-  
+
   static String getLocalJobDir(String jobid) {
-	return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
+    return getJobCacheSubdir() + Path.SEPARATOR + jobid;
   }
 
-  static String getLocalTaskDir(String jobid, String taskid) {
-	return getLocalTaskDir(jobid, taskid, false) ; 
+  static String getLocalJobConfFile(String jobid) {
+    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+  }
+
+  static String getTaskConfFile(String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
+        + TaskTracker.JOBFILE;
+  }
+
+  static String getJobJarsDir(String jobid) {
+    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+  }
+
+  static String getJobJarFile(String jobid) {
+    return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+  }
+
+  static String getJobWorkDir(String jobid) {
+    return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+  }
+
+  static String getLocalSplitFile(String jobid, String taskid) {
+    return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+        + TaskTracker.LOCAL_SPLIT_FILE;
   }
 
   static String getIntermediateOutputDir(String jobid, String taskid) {
-	return getLocalTaskDir(jobid, taskid) 
-           + Path.SEPARATOR + TaskTracker.OUTPUT ; 
+    return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+        + TaskTracker.OUTPUT;
+  }
+
+  static String getLocalTaskDir(String jobid, String taskid) {
+    return getLocalTaskDir(jobid, taskid, false);
   }
 
   static String getLocalTaskDir(String jobid, 
@@ -418,7 +452,17 @@
 	}
 	return taskDir;
   }
-  
+
+  static String getTaskWorkDir(String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String dir =
+      getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
+      dir = dir + TASK_CLEANUP_SUFFIX;
+    }
+    return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+  }
+
   String getPid(TaskAttemptID tid) {
     TaskInProgress tip = tasks.get(tid);
     if (tip != null) {
@@ -762,10 +806,233 @@
 
   // intialize the job directory
   private void localizeJob(TaskInProgress tip) throws IOException {
-    Path localJarFile = null;
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
-    Path jobFile = new Path(t.getJobFile());
+    RunningJob rjob = addTaskToJob(jobId, tip);
+
+    synchronized (rjob) {
+      if (!rjob.localized) {
+
+        JobConf localJobConf = localizeJobFiles(t);
+
+        // Now initialize the job via task-controller so as to set
+        // ownership/permissions of jars, job-work-dir. Note that initializeJob
+        // should be the last call after every other directory/file to be
+        // directly under the job directory is created.
+        JobInitializationContext context = new JobInitializationContext();
+        context.jobid = jobId;
+        context.user = localJobConf.getUser();
+        context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
+        taskController.initializeJob(context);
+
+        rjob.jobConf = localJobConf;
+        rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+                             localJobConf.getKeepFailedTaskFiles());
+        rjob.localized = true;
+      }
+    }
+    launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
+  }
+
+  /**
+   * Localize the job on this tasktracker. Specifically
+   * <ul>
+   * <li>Cleanup and create job directories on all disks</li>
+   * <li>Download the job config file job.xml from the FS</li>
+   * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
+   * in the configuration.
+   * <li>Download the job jar file job.jar from the FS, unjar it and set jar
+   * file in the configuration.</li>
+   * </ul>
+   * 
+   * @param t task whose job has to be localized on this TT
+   * @return the modified job configuration to be used for all the tasks of this
+   *         job as a starting point.
+   * @throws IOException
+   */
+  JobConf localizeJobFiles(Task t)
+      throws IOException {
+    JobID jobId = t.getJobID();
+
+    // Initialize the job directories first
+    FileSystem localFs = FileSystem.getLocal(fConf);
+    initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+
+    // Download the job.xml for this job from the system FS
+    Path localJobFile = localizeJobConfFile(new Path(t.getJobFile()), jobId);
+
+    JobConf localJobConf = new JobConf(localJobFile);
+
+    // create the 'job-work' directory: job-specific shared directory for use as
+    // scratch space by all tasks of the same job running on this TaskTracker. 
+    Path workDir =
+        lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
+            fConf);
+    if (!localFs.mkdirs(workDir)) {
+      throw new IOException("Mkdirs failed to create " 
+                  + workDir.toString());
+    }
+    System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
+    localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+
+    // Download the job.jar for this job from the system FS
+    localizeJobJarFile(jobId, localFs, localJobConf);
+    return localJobConf;
+  }
+
+  static class PermissionsHandler {
+    /**
+     * Permission information useful for setting permissions for a given path.
+     * Using this, one can set all possible combinations of permissions for the
+     * owner of the file. But permissions for the group and all others can only
+     * be set together, i.e. permissions for group cannot be set different from
+     * those for others and vice versa.
+     */
+    static class PermissionsInfo {
+      public boolean readPermissions;
+      public boolean writePermissions;
+      public boolean executablePermissions;
+      public boolean readPermsOwnerOnly;
+      public boolean writePermsOwnerOnly;
+      public boolean executePermsOwnerOnly;
+
+      /**
+       * Create a permissions-info object with the given attributes
+       * 
+       * @param readPerms
+       * @param writePerms
+       * @param executePerms
+       * @param readOwnerOnly
+       * @param writeOwnerOnly
+       * @param executeOwnerOnly
+       */
+      public PermissionsInfo(boolean readPerms, boolean writePerms,
+          boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
+          boolean executeOwnerOnly) {
+        readPermissions = readPerms;
+        writePermissions = writePerms;
+        executablePermissions = executePerms;
+        readPermsOwnerOnly = readOwnerOnly;
+        writePermsOwnerOnly = writeOwnerOnly;
+        executePermsOwnerOnly = executeOwnerOnly;
+      }
+    }
+
+    /**
+     * Set permission on the given file path using the specified permissions
+     * information. We use java api to set permission instead of spawning chmod
+     * processes. This saves a lot of time. Using this, one can set all possible
+     * combinations of permissions for the owner of the file. But permissions
+     * for the group and all others can only be set together, i.e. permissions
+     * for group cannot be set different from those for others and vice versa.
+     * 
+     * This method should satisfy the needs of most of the applications. For
+     * those it doesn't, {@link FileUtil#chmod} can be used.
+     * 
+     * @param f file path
+     * @param pInfo permissions information
+     * @return true if success, false otherwise
+     */
+    static boolean setPermissions(File f, PermissionsInfo pInfo) {
+      if (pInfo == null) {
+        LOG.debug(" PermissionsInfo is null, returning.");
+        return true;
+      }
+
+      LOG.debug("Setting permission for " + f.getAbsolutePath());
+
+      boolean ret = true;
+
+      // Clear all the flags
+      ret = f.setReadable(false, false) && ret;
+      ret = f.setWritable(false, false) && ret;
+      ret = f.setExecutable(false, false) && ret;
+
+      ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
+      LOG.debug("Readable status for " + f + " set to " + ret);
+      ret =
+          f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
+              && ret;
+      LOG.debug("Writable status for " + f + " set to " + ret);
+      ret =
+          f.setExecutable(pInfo.executablePermissions,
+              pInfo.executePermsOwnerOnly)
+              && ret;
+
+      LOG.debug("Executable status for " + f + " set to " + ret);
+      return ret;
+    }
+
+    /**
+     * Permissions rwxr_xr_x
+     */
+    static PermissionsInfo sevenFiveFive =
+        new PermissionsInfo(true, true, true, false, true, false);
+    /**
+     * Completely private permissions
+     */
+    static PermissionsInfo sevenZeroZero =
+        new PermissionsInfo(true, true, true, true, true, true);
+  }
+
+  /**
+   * Prepare the job directories for a given job. To be called by the job
+   * localization code, only if the job is not already localized.
+   * 
+   * <br>
+   * Here, we set 700 permissions on the job directories created on all disks.
+   * This we do so as to avoid any misuse by other users till the time
+   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+   * later time to set proper private permissions on the job directories. <br>
+   * 
+   * @param jobId
+   * @param fs
+   * @param localDirs
+   * @throws IOException
+   */
+  private static void initializeJobDirs(JobID jobId, FileSystem fs,
+      String[] localDirs)
+      throws IOException {
+    boolean initJobDirStatus = false;
+    String jobDirPath = getLocalJobDir(jobId.toString());
+    for (String localDir : localDirs) {
+      Path jobDir = new Path(localDir, jobDirPath);
+      if (fs.exists(jobDir)) {
+        // this will happen on a partial execution of localizeJob. Sometimes
+        // copying job.xml to the local disk succeeds but copying job.jar might
+        // throw out an exception. We should clean up and then try again.
+        fs.delete(jobDir, true);
+      }
+
+      boolean jobDirStatus = fs.mkdirs(jobDir);
+      if (!jobDirStatus) {
+        LOG.warn("Not able to create job directory " + jobDir.toString());
+      }
+
+      initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+      // job-dir has to be private to the TT
+      PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
+          PermissionsHandler.sevenZeroZero);
+    }
+
+    if (!initJobDirStatus) {
+      throw new IOException("Not able to initialize job directories "
+          + "in any of the configured local directories for job "
+          + jobId.toString());
+    }
+  }
+
+  /**
+   * Download the job configuration file from the FS.
+   * 
+   * @param t Task whose job file has to be downloaded
+   * @param jobId jobid of the task
+   * @return the local file system path of the downloaded file.
+   * @throws IOException
+   */
+  private Path localizeJobConfFile(Path jobFile, JobID jobId)
+      throws IOException {
     // Get sizes of JobFile and JarFile
     // sizes are -1 if they are not present.
     FileStatus status = null;
@@ -776,82 +1043,95 @@
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
     }
-    Path localJobFile = lDirAlloc.getLocalPathForWrite(
-                                    getLocalJobDir(jobId.toString())
-                                    + Path.SEPARATOR + "job.xml",
-                                    jobFileSize, fConf);
-    RunningJob rjob = addTaskToJob(jobId, tip);
-    synchronized (rjob) {
-      if (!rjob.localized) {
-  
-        FileSystem localFs = FileSystem.getLocal(fConf);
-        // this will happen on a partial execution of localizeJob.
-        // Sometimes the job.xml gets copied but copying job.jar
-        // might throw out an exception
-        // we should clean up and then try again
-        Path jobDir = localJobFile.getParent();
-        if (localFs.exists(jobDir)){
-          localFs.delete(jobDir, true);
-          boolean b = localFs.mkdirs(jobDir);
-          if (!b)
-            throw new IOException("Not able to create job directory "
-                                  + jobDir.toString());
-        }
-        systemFS.copyToLocalFile(jobFile, localJobFile);
-        JobConf localJobConf = new JobConf(localJobFile);
-        
-        // create the 'work' directory
-        // job-specific shared directory for use as scratch space 
-        Path workDir = lDirAlloc.getLocalPathForWrite(
-                         (getLocalJobDir(jobId.toString())
-                         + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
-        if (!localFs.mkdirs(workDir)) {
-          throw new IOException("Mkdirs failed to create " 
-                      + workDir.toString());
-        }
-        System.setProperty("job.local.dir", workDir.toString());
-        localJobConf.set("job.local.dir", workDir.toString());
-        
-        // copy Jar file to the local FS and unjar it.
-        String jarFile = localJobConf.getJar();
-        long jarFileSize = -1;
-        if (jarFile != null) {
-          Path jarFilePath = new Path(jarFile);
-          try {
-            status = systemFS.getFileStatus(jarFilePath);
-            jarFileSize = status.getLen();
-          } catch(FileNotFoundException fe) {
-            jarFileSize = -1;
-          }
-          // Here we check for and we check five times the size of jarFileSize
-          // to accommodate for unjarring the jar file in work directory 
-          localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
-                                     getLocalJobDir(jobId.toString())
-                                     + Path.SEPARATOR + "jars",
-                                     5 * jarFileSize, fConf), "job.jar");
-          if (!localFs.mkdirs(localJarFile.getParent())) {
-            throw new IOException("Mkdirs failed to create jars directory "); 
-          }
-          systemFS.copyToLocalFile(jarFilePath, localJarFile);
-          localJobConf.setJar(localJarFile.toString());
-          OutputStream out = localFs.create(localJobFile);
-          try {
-            localJobConf.writeXml(out);
-          } finally {
-            out.close();
-          }
-          // also unjar the job.jar files 
-          RunJar.unJar(new File(localJarFile.toString()),
-                       new File(localJarFile.getParent().toString()));
-        }
-        rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
-                             localJobConf.getKeepFailedTaskFiles());
-        rjob.localized = true;
-        rjob.jobConf = localJobConf;
-        taskController.initializeJob(jobId);
+
+    Path localJobFile =
+        lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+            jobFileSize, fConf);
+
+    // Download job.xml
+    systemFS.copyToLocalFile(jobFile, localJobFile);
+    return localJobFile;
+  }
+
+  /**
+   * Download the job jar file from FS to the local file system and unjar it.
+   * Set the local jar file in the passed configuration.
+   * 
+   * @param jobId
+   * @param localFs
+   * @param localJobConf
+   * @throws IOException
+   */
+  private void localizeJobJarFile(JobID jobId, FileSystem localFs,
+      JobConf localJobConf)
+      throws IOException {
+    // copy Jar file to the local FS and unjar it.
+    String jarFile = localJobConf.getJar();
+    FileStatus status = null;
+    long jarFileSize = -1;
+    if (jarFile != null) {
+      Path jarFilePath = new Path(jarFile);
+      try {
+        status = systemFS.getFileStatus(jarFilePath);
+        jarFileSize = status.getLen();
+      } catch (FileNotFoundException fe) {
+        jarFileSize = -1;
+      }
+      // Here we check for and we check five times the size of jarFileSize
+      // to accommodate for unjarring the jar file in userfiles directory
+      Path localJarFile =
+          lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
+              5 * jarFileSize, fConf);
+
+      // Download job.jar
+      systemFS.copyToLocalFile(jarFilePath, localJarFile);
+
+      localJobConf.setJar(localJarFile.toString());
+
+      // Also un-jar the job.jar files. We un-jar it so that classes inside
+      // sub-directories, for e.g., lib/, classes/ are available on class-path
+      RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
+          .getParent().toString()));
+    }
+  }
+
+  /**
+   * Create taskDirs on all the disks. Otherwise, in some cases, like when
+   * LinuxTaskController is in use, child might wish to balance load across
+   * disks but cannot itself create attempt directory because of the fact that
+   * job directory is writable only by the TT.
+   * 
+   * @param jobId
+   * @param attemptId
+   * @param isCleanupAttempt
+   * @param fs
+   * @param localDirs
+   * @throws IOException
+   */
+  private static void initializeAttemptDirs(String jobId, String attemptId,
+      boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
+      throws IOException {
+
+    boolean initStatus = false;
+    String attemptDirPath =
+        getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
+
+    for (String localDir : localDirs) {
+      Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+      boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+      if (!attemptDirStatus) {
+        LOG.warn("localAttemptDir " + localAttemptDir.toString()
+            + " couldn't be created.");
       }
+      initStatus = initStatus || attemptDirStatus;
+    }
+
+    if (!initStatus) {
+      throw new IOException("Not able to initialize attempt directories "
+          + "in any of the configured local directories for the attempt "
+          + attemptId.toString());
     }
-    launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
   }
 
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
@@ -890,7 +1170,7 @@
     for (TaskInProgress tip : tasksToClose.values()) {
       tip.jobHasFinished(false);
     }
-    
+
     this.running = false;
         
     // Clear local storage
@@ -929,6 +1209,17 @@
   }
 
   /**
+   * For testing
+   */
+  TaskTracker() {
+    server = null;
+  }
+
+  void setConf(JobConf conf) {
+    fConf = conf;
+  }
+
+  /**
    * Start with the local machine name, and the default JobTracker
    */
   public TaskTracker(JobConf conf) throws IOException {
@@ -1568,10 +1859,9 @@
       }
       
       MapOutputFile mapOutputFile = new MapOutputFile();
-      mapOutputFile.setJobId(taskId.getJobID());
       mapOutputFile.setConf(conf);
       
-      Path tmp_output =  mapOutputFile.getOutputFile(taskId);
+      Path tmp_output =  mapOutputFile.getOutputFile();
       if(tmp_output == null)
         return 0;
       FileSystem localFS = FileSystem.getLocal(conf);
@@ -1847,54 +2137,36 @@
       taskTimeout = (10 * 60 * 1000);
     }
         
-    private void localizeTask(Task task) throws IOException{
+    void localizeTask(Task task) throws IOException{
 
-      Path localTaskDir = 
-        lDirAlloc.getLocalPathForWrite(
-          TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
-            task.getTaskID().toString(), task.isTaskCleanupTask()), 
-          defaultJobConf );
-      
       FileSystem localFs = FileSystem.getLocal(fConf);
-      if (!localFs.mkdirs(localTaskDir)) {
-        throw new IOException("Mkdirs failed to create " 
-                    + localTaskDir.toString());
-      }
 
-      // create symlink for ../work if it already doesnt exist
-      String workDir = lDirAlloc.getLocalPathToRead(
-                         TaskTracker.getLocalJobDir(task.getJobID().toString())
-                         + Path.SEPARATOR  
-                         + "work", defaultJobConf).toString();
-      String link = localTaskDir.getParent().toString() 
-                      + Path.SEPARATOR + "work";
-      File flink = new File(link);
-      if (!flink.exists())
-        FileUtil.symLink(workDir, link);
-      
+      // create taskDirs on all the disks.
+      initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
+          .toString(), task.isTaskCleanupTask(), localFs, fConf
+          .getStrings("mapred.local.dir"));
+
       // create the working-directory of the task 
-      Path cwd = lDirAlloc.getLocalPathForWrite(
-                   getLocalTaskDir(task.getJobID().toString(), 
-                      task.getTaskID().toString(), task.isTaskCleanupTask()) 
-                   + Path.SEPARATOR + MRConstants.WORKDIR,
-                   defaultJobConf);
+      Path cwd =
+          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
+              .toString(), task.getTaskID().toString(), task
+              .isTaskCleanupTask()), defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
                     + cwd.toString());
       }
 
-      Path localTaskFile = new Path(localTaskDir, "job.xml");
-      task.setJobFile(localTaskFile.toString());
       localJobConf.set("mapred.local.dir",
                        fConf.get("mapred.local.dir"));
+
       if (fConf.get("slave.host.name") != null) {
         localJobConf.set("slave.host.name",
                          fConf.get("slave.host.name"));
       }
             
-      localJobConf.set("mapred.task.id", task.getTaskID().toString());
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
 
+      // Do the task-type specific localization
       task.localizeConfiguration(localJobConf);
       
       List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
@@ -1927,12 +2199,6 @@
         //disable jvm reuse
         localJobConf.setNumTasksToExecutePerJvm(1);
       }
-      OutputStream out = localFs.create(localTaskFile);
-      try {
-        localJobConf.writeXml(out);
-      } finally {
-        out.close();
-      }
       task.setConf(localJobConf);
     }
         
@@ -2188,7 +2454,7 @@
                                      localJobConf). toString());
               } catch (IOException e) {
                 LOG.warn("Working Directory of the task " + task.getTaskID() +
-                		 "doesnt exist. Caught exception " +
+                                " doesnt exist. Caught exception " +
                           StringUtils.stringifyException(e));
               }
               // Build the command  
@@ -2463,34 +2729,39 @@
           if (localJobConf == null) {
             return;
           }
-          String taskDir = getLocalTaskDir(task.getJobID().toString(),
-                             taskId.toString(), task.isTaskCleanupTask());
+          String localTaskDir =
+              getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
+                  task.isTaskCleanupTask());
+          String taskWorkDir =
+              getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
+                  task.isTaskCleanupTask());
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
               //and reduce inputs get stored)
               runner.close();
             }
-            //We don't delete the workdir
-            //since some other task (running in the same JVM) 
-            //might be using the dir. The JVM running the tasks would clean
-            //the workdir per a task in the task process itself.
+
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+              // No jvm reuse, remove everything
               directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
-                  taskDir));
+                  localTaskDir));
             }  
-            
             else {
-              directoryCleanupThread.addToQueue(localFs,
-                  getLocalFiles(defaultJobConf,
-                taskDir+"/job.xml"));
+              // Jvm reuse. We don't delete the workdir since some other task
+              // (running in the same JVM) might be using the dir. The JVM
+              // running the tasks would clean the workdir per a task in the
+              // task process itself.
+              directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+                  defaultJobConf, localTaskDir + Path.SEPARATOR
+                      + TaskTracker.JOBFILE));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
               directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
-                  taskDir+"/work"));
+                  taskWorkDir));
             }  
           }
         } catch (Throwable ie) {