You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:41:32 UTC

svn commit: r1077108 [2/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ contrib/gridmix/src/test/org/apache/hadoop/...

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar  4 03:41:31 2011
@@ -30,9 +30,8 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.Node;
 
 
@@ -61,7 +60,7 @@ class TaskInProgress {
 
   // Defines the TIP
   private String jobFile = null;
-  private RawSplit rawSplit;
+  private final TaskSplitMetaInfo splitInfo;
   private int numMaps;
   private int partition;
   private JobTracker jobtracker;
@@ -131,12 +130,12 @@ class TaskInProgress {
    * Constructor for MapTask
    */
   public TaskInProgress(JobID jobid, String jobFile, 
-                        RawSplit rawSplit, 
+                        TaskSplitMetaInfo split, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition,
                         int numSlotsRequired) {
     this.jobFile = jobFile;
-    this.rawSplit = rawSplit;
+    this.splitInfo = split;
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
@@ -155,6 +154,7 @@ class TaskInProgress {
                         int partition, JobTracker jobtracker, JobConf conf,
                         JobInProgress job, int numSlotsRequired) {
     this.jobFile = jobFile;
+    this.splitInfo = null;
     this.numMaps = numMaps;
     this.partition = partition;
     this.jobtracker = jobtracker;
@@ -284,7 +284,7 @@ class TaskInProgress {
    * Whether this is a map task
    */
   public boolean isMapTask() {
-    return rawSplit != null;
+    return splitInfo != null;
   }
     
   /**
@@ -746,7 +746,7 @@ class TaskInProgress {
    */
   public String[] getSplitLocations() {
     if (isMapTask() && !jobSetup && !jobCleanup) {
-      return rawSplit.getLocations();
+      return splitInfo.getLocations();
     }
     return new String[0];
   }
@@ -937,19 +937,11 @@ class TaskInProgress {
     if (isMapTask()) {
       LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
           + failedRanges.getIndicesCount());
-      String splitClass = null;
-      BytesWritable split;
-      if (!jobSetup && !jobCleanup) {
-        splitClass = rawSplit.getClassName();
-        split = rawSplit.getBytes();
-      } else {
-        split = new BytesWritable();
-      }
-      t = new MapTask(jobFile, taskid, partition, splitClass, split, 
-                      numSlotsNeeded, job.getUser());
+      t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
+                      numSlotsNeeded);
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps, 
-                         numSlotsNeeded, job.getUser());
+                         numSlotsNeeded);
     }
     if (jobCleanup) {
       t.setJobCleanupTask();
@@ -1060,7 +1052,7 @@ class TaskInProgress {
     if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
-    String[] splits = rawSplit.getLocations();
+    String[] splits = splitInfo.getLocations();
     Node[] nodes = new Node[splits.length];
     for (int i = 0; i < splits.length; i++) {
       nodes[i] = jobtracker.getNode(splits[i]);
@@ -1090,16 +1082,12 @@ class TaskInProgress {
 
   public long getMapInputSize() {
     if(isMapTask() && !jobSetup && !jobCleanup) {
-      return rawSplit.getDataLength();
+      return splitInfo.getInputDataLength();
     } else {
       return 0;
     }
   }
   
-  public void clearSplit() {
-    rawSplit.clearBytes();
-  }
-  
   /**
    * This class keeps the records to be skipped during further executions 
    * based on failed records from all the previous attempts.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:41:31 2011
@@ -84,6 +84,8 @@ import org.apache.hadoop.metrics.Updater
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -474,6 +476,13 @@ public class TaskTracker 
 	return taskDir;
   }
 
+  private void setUgi(String user, Configuration conf) {
+    //The dummy-group used here will not be required once we have UGI
+    //object creation with just the user name.
+    conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        user+","+UnixUserGroupInformation.DEFAULT_GROUP);
+  }
+  
   String getPid(TaskAttemptID tid) {
     TaskInProgress tip = tasks.get(tid);
     if (tip != null) {
@@ -833,12 +842,16 @@ public class TaskTracker 
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
     Path jobFile = new Path(t.getJobFile());
+    String userName = t.getUser();
+    JobConf userConf = new JobConf(getJobConf());
+    setUgi(userName, userConf);
+    FileSystem userFs = jobFile.getFileSystem(userConf);
     // Get sizes of JobFile and JarFile
     // sizes are -1 if they are not present.
     FileStatus status = null;
     long jobFileSize = -1;
     try {
-      status = systemFS.getFileStatus(jobFile);
+      status = userFs.getFileStatus(jobFile);
       jobFileSize = status.getLen();
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
@@ -864,7 +877,7 @@ public class TaskTracker 
             throw new IOException("Not able to create job directory "
                                   + jobDir.toString());
         }
-        systemFS.copyToLocalFile(jobFile, localJobFile);
+        userFs.copyToLocalFile(jobFile, localJobFile);
         JobConf localJobConf = new JobConf(localJobFile);
         
         // create the 'work' directory
@@ -885,7 +898,7 @@ public class TaskTracker 
         if (jarFile != null) {
           Path jarFilePath = new Path(jarFile);
           try {
-            status = systemFS.getFileStatus(jarFilePath);
+            status = userFs.getFileStatus(jarFilePath);
             jarFileSize = status.getLen();
           } catch(FileNotFoundException fe) {
             jarFileSize = -1;
@@ -899,7 +912,7 @@ public class TaskTracker 
           if (!localFs.mkdirs(localJarFile.getParent())) {
             throw new IOException("Mkdirs failed to create jars directory "); 
           }
-          systemFS.copyToLocalFile(jarFilePath, localJarFile);
+          userFs.copyToLocalFile(jarFilePath, localJarFile);
           localJobConf.setJar(localJarFile.toString());
           OutputStream out = localFs.create(localJobFile);
           try {

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Fri Mar  4 03:41:31 2011
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.conf.Configuration;
+/**
+ * A utility to manage job submission files.
+ */
+public class JobSubmissionFiles {
+
+  // job submission directory is private!
+  final public static FsPermission JOB_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0700); // rwx--------
+  //job files are world-wide readable and owner writable
+  final public static FsPermission JOB_FILE_PERMISSION = 
+    FsPermission.createImmutable((short) 0644); // rw-r--r--
+  
+  public static Path getJobSplitFile(Path jobSubmissionDir) {
+    return new Path(jobSubmissionDir, "job.split");
+  }
+
+  public static Path getJobSplitMetaFile(Path jobSubmissionDir) {
+    return new Path(jobSubmissionDir, "job.splitmetainfo");
+  }
+  
+  /**
+   * Get the job conf path.
+   */
+  public static Path getJobConfPath(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "job.xml");
+  }
+    
+  /**
+   * Get the job jar path.
+   */
+  public static Path getJobJar(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "job.jar");
+  }
+  
+  /**
+   * Get the job distributed cache files path.
+   * @param jobSubmitDir
+   */
+  public static Path getJobDistCacheFiles(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "files");
+  }
+  /**
+   * Get the job distributed cache archives path.
+   * @param jobSubmitDir 
+   */
+  public static Path getJobDistCacheArchives(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "archives");
+  }
+  /**
+   * Get the job distributed cache libjars path.
+   * @param jobSubmitDir 
+   */
+  public static Path getJobDistCacheLibjars(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "libjars");
+  }
+
+  /**
+   * Initializes the staging directory and returns the path. It also
+   * keeps track of all necessary ownership & permissions
+   * @param cluster
+   * @param conf
+   */
+  public static Path getStagingDir(JobClient client, Configuration conf) 
+  throws IOException {
+    Path stagingArea = client.getStagingAreaDir();
+    FileSystem fs = stagingArea.getFileSystem(conf);
+    String realUser;
+    String currentUser;
+    try {
+      UserGroupInformation ugi = UnixUserGroupInformation.login();
+      realUser = ugi.getUserName();
+      ugi = UnixUserGroupInformation.login(conf);
+      currentUser = ugi.getUserName();
+    } catch (LoginException le) {
+      throw new IOException(le);
+    }
+    if (fs.exists(stagingArea)) {
+      FileStatus fsStatus = fs.getFileStatus(stagingArea);
+      String owner = fsStatus.getOwner();
+      if (!(owner.equals(currentUser) || owner.equals(realUser)) || 
+          !fsStatus.getPermission().
+                               equals(JOB_DIR_PERMISSION)) {
+         throw new IOException("The ownership/permissions on the staging " +
+                      "directory " + stagingArea + " is not as expected. " + 
+                      "It is owned by " + owner + " and permissions are "+ 
+                      fsStatus.getPermission() + ". The directory must " +
+                      "be owned by the submitter " + currentUser + " or " +
+                      "by " + realUser + " and permissions must be rwx------");
+      }
+    } else {
+      fs.mkdirs(stagingArea, 
+          new FsPermission(JOB_DIR_PERMISSION));
+    }
+    return stagingArea;
+  }
+  
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java Fri Mar  4 03:41:31 2011
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This class groups the fundamental classes associated with
+ * reading/writing splits. The split information is divided into
+ * two parts based on the consumer of the information. The two
+ * parts are the split meta information, and the raw split 
+ * information. The first part is consumed by the JobTracker to
+ * create the tasks' locality data structures. The second part is
+ * used by the maps at runtime to know what to do!
+ * These pieces of information are written to two separate files.
+ * The metainformation file is slurped by the JobTracker during 
+ * job initialization. A map task gets the meta information during
+ * the launch and it reads the raw split bytes directly from the 
+ * file.
+ */
+public class JobSplit {
+  static final int META_SPLIT_VERSION = 1;
+  static final byte[] META_SPLIT_FILE_HEADER;
+  static {
+    try {
+      META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
+    } catch (UnsupportedEncodingException u) {
+      throw new RuntimeException(u);
+    }
+  } 
+  public static final TaskSplitMetaInfo EMPTY_TASK_SPLIT = 
+    new TaskSplitMetaInfo();
+  
+  /**
+   * This represents the meta information about the task split.
+   * The main fields are 
+   *     - start offset in actual split
+   *     - data length that will be processed in this split
+   *     - hosts on which this split is local
+   */
+  public static class SplitMetaInfo implements Writable {
+    private long startOffset;
+    private long inputDataLength;
+    private String[] locations;
+
+    public SplitMetaInfo() {}
+    
+    public SplitMetaInfo(String[] locations, long startOffset, 
+        long inputDataLength) {
+      this.locations = locations;
+      this.startOffset = startOffset;
+      this.inputDataLength = inputDataLength;
+    }
+    
+    public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
+      try {
+        this.locations = split.getLocations();
+        this.inputDataLength = split.getLength();
+        this.startOffset = startOffset;
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+    
+    public String[] getLocations() {
+      return locations;
+    }
+  
+    public long getStartOffset() {
+      return startOffset;
+    }
+      
+    public long getInputDataLength() {
+      return inputDataLength;
+    }
+    
+    public void setInputDataLocations(String[] locations) {
+      this.locations = locations;
+    }
+    
+    public void setInputDataLength(long length) {
+      this.inputDataLength = length;
+    }
+    
+    public void readFields(DataInput in) throws IOException {
+      int len = WritableUtils.readVInt(in);
+      locations = new String[len];
+      for (int i = 0; i < locations.length; i++) {
+        locations[i] = Text.readString(in);
+      }
+      startOffset = WritableUtils.readVLong(in);
+      inputDataLength = WritableUtils.readVLong(in);
+    }
+  
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVInt(out, locations.length);
+      for (int i = 0; i < locations.length; i++) {
+        Text.writeString(out, locations[i]);
+      }
+      WritableUtils.writeVLong(out, startOffset);
+      WritableUtils.writeVLong(out, inputDataLength);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append("data-size : " + inputDataLength + "\n");
+      buf.append("start-offset : " + startOffset + "\n");
+      buf.append("locations : " + "\n");
+      for (String loc : locations) {
+        buf.append("  " + loc + "\n");
+      }
+      return buf.toString();
+    }
+  }
+  /**
+   * This represents the meta information about the task split that the 
+   * JobTracker creates
+   */
+  public static class TaskSplitMetaInfo {
+    private TaskSplitIndex splitIndex;
+    private long inputDataLength;
+    private String[] locations;
+    public TaskSplitMetaInfo(){
+      this.splitIndex = new TaskSplitIndex();
+      this.locations = new String[0];
+    }
+    public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations, 
+        long inputDataLength) {
+      this.splitIndex = splitIndex;
+      this.locations = locations;
+      this.inputDataLength = inputDataLength;
+    }
+    public TaskSplitMetaInfo(InputSplit split, long startOffset) 
+    throws InterruptedException, IOException {
+      this(new TaskSplitIndex("", startOffset), split.getLocations(), 
+          split.getLength());
+    }
+    
+    public TaskSplitMetaInfo(String[] locations, long startOffset, 
+        long inputDataLength) {
+      this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
+    }
+    
+    public TaskSplitIndex getSplitIndex() {
+      return splitIndex;
+    }
+    
+    public String getSplitLocation() {
+      return splitIndex.getSplitLocation();
+    }
+    public long getInputDataLength() {
+      return inputDataLength;
+    }
+    public String[] getLocations() {
+      return locations;
+    }
+    public long getStartOffset() {
+      return splitIndex.getStartOffset();
+    }
+  }
+  
+  /**
+   * This represents the meta information about the task split that the 
+   * task gets
+   */
+  public static class TaskSplitIndex {
+    private String splitLocation;
+    private long startOffset;
+    public TaskSplitIndex(){
+      this("", 0);
+    }
+    public TaskSplitIndex(String splitLocation, long startOffset) {
+      this.splitLocation = splitLocation;
+      this.startOffset = startOffset;
+    }
+    public long getStartOffset() {
+      return startOffset;
+    }
+    public String getSplitLocation() {
+      return splitLocation;
+    }
+    public void readFields(DataInput in) throws IOException {
+      splitLocation = Text.readString(in);
+      startOffset = WritableUtils.readVLong(in);
+    }
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, splitLocation);
+      WritableUtils.writeVLong(out, startOffset);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Fri Mar  4 03:41:31 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+
+/**
+ * The class that is used by the Job clients to write splits (both the meta
+ * and the raw bytes parts)
+ */
+public class JobSplitWriter {
+
+  private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
+  private static final byte[] SPLIT_FILE_HEADER;
+  static {
+    try {
+      SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
+    } catch (UnsupportedEncodingException u) {
+      throw new RuntimeException(u);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
+      Configuration conf, List<InputSplit> splits) 
+  throws IOException, InterruptedException {
+    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
+    createSplitFiles(jobSubmitDir, conf, array);
+  }
+  
+  public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
+      Configuration conf,T[] splits) 
+  throws IOException, InterruptedException {
+    FileSystem fs = jobSubmitDir.getFileSystem(conf);
+    FSDataOutputStream out = createFile(fs, 
+        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
+    out.close();
+    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
+        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+        info);
+  }
+  
+  public static void createSplitFiles(Path jobSubmitDir, 
+      Configuration conf, org.apache.hadoop.mapred.InputSplit[] splits) 
+  throws IOException {
+    FileSystem fs = jobSubmitDir.getFileSystem(conf);
+    FSDataOutputStream out = createFile(fs, 
+        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+    SplitMetaInfo[] info = writeOldSplits(splits, out);
+    out.close();
+    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
+        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+        info);
+  }
+  
+  private static FSDataOutputStream createFile(FileSystem fs, Path splitFile, 
+      Configuration job)  throws IOException {
+    FSDataOutputStream out = FileSystem.create(fs, splitFile, 
+        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+    int replication = job.getInt("mapred.submit.replication", 10);
+    fs.setReplication(splitFile, (short)replication);
+    writeSplitHeader(out);
+    return out;
+  }
+  private static void writeSplitHeader(FSDataOutputStream out) 
+  throws IOException {
+    out.write(SPLIT_FILE_HEADER);
+    out.writeInt(splitVersion);
+  }
+  
+  @SuppressWarnings("unchecked")
+  private static <T extends InputSplit> 
+  SplitMetaInfo[] writeNewSplits(Configuration conf, 
+      T[] array, FSDataOutputStream out)
+  throws IOException, InterruptedException {
+
+    SplitMetaInfo[] info = new SplitMetaInfo[array.length];
+    if (array.length != 0) {
+      SerializationFactory factory = new SerializationFactory(conf);
+      int i = 0;
+      long offset = out.size();
+      for(T split: array) {
+        int prevCount = out.size();
+        Text.writeString(out, split.getClass().getName());
+        Serializer<T> serializer = 
+          factory.getSerializer((Class<T>) split.getClass());
+        serializer.open(out);
+        serializer.serialize(split);
+        int currCount = out.size();
+        info[i++] = 
+          new JobSplit.SplitMetaInfo( 
+              split.getLocations(), offset,
+              split.getLength());
+        offset += currCount - prevCount;
+      }
+    }
+    return info;
+  }
+  
+  private static SplitMetaInfo[] writeOldSplits(
+      org.apache.hadoop.mapred.InputSplit[] splits,
+      FSDataOutputStream out) throws IOException {
+    SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
+    if (splits.length != 0) {
+      int i = 0;
+      long offset = out.size();
+      for(org.apache.hadoop.mapred.InputSplit split: splits) {
+        int prevLen = out.size();
+        Text.writeString(out, split.getClass().getName());
+        split.write(out);
+        int currLen = out.size();
+        info[i++] = new JobSplit.SplitMetaInfo( 
+            split.getLocations(), offset,
+            split.getLength());
+        offset += currLen - prevLen;
+      }
+    }
+    return info;
+  }
+
+  private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, 
+      FsPermission p, int splitMetaInfoVersion, 
+      JobSplit.SplitMetaInfo[] allSplitMetaInfo) 
+  throws IOException {
+    // write the splits meta-info to a file for the job tracker
+    FSDataOutputStream out = 
+      FileSystem.create(fs, filename, p);
+    out.write(JobSplit.META_SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(out, splitMetaInfoVersion);
+    WritableUtils.writeVInt(out, allSplitMetaInfo.length);
+    for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
+      splitMetaInfo.write(out);
+    }
+    out.close();
+  }
+}
+

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Fri Mar  4 03:41:31 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+
+/**
+ * A (internal) utility that reads the split meta info and creates
+ * split meta info objects
+ */
+
+public class SplitMetaInfoReader {
+  
+  public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
+      JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
+  throws IOException {
+    long maxMetaInfoSize = conf.getLong("mapreduce.job.split.metainfo.maxsize", 
+        10000000L);
+    Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
+    FileStatus fStatus = fs.getFileStatus(metaSplitFile);
+    if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+      throw new IOException("Split metadata size exceeded " +
+          maxMetaInfoSize +". Aborting job " + jobId);
+    }
+    FSDataInputStream in = fs.open(metaSplitFile);
+    byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+    in.readFully(header);
+    if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+      throw new IOException("Invalid header on split file");
+    }
+    int vers = WritableUtils.readVInt(in);
+    if (vers != JobSplit.META_SPLIT_VERSION) {
+      in.close();
+      throw new IOException("Unsupported split version " + vers);
+    }
+    int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
+    JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = 
+      new JobSplit.TaskSplitMetaInfo[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+      splitMetaInfo.readFields(in);
+      JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+          JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(), 
+          splitMetaInfo.getStartOffset());
+      allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, 
+          splitMetaInfo.getLocations(), 
+          splitMetaInfo.getInputDataLength());
+    }
+    in.close();
+    return allSplitMetaInfo;
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml Fri Mar  4 03:41:31 2011
@@ -91,11 +91,11 @@
     <test> <!-- TESTED -->
       <description>ls: directory using absolute path</description>
       <test-commands>
-        <command>-fs NAMENODE -mkdir /dir1</command>
-        <command>-fs NAMENODE -ls /</command>
+        <command>-fs NAMENODE -mkdir /dir1/dir2</command>
+        <command>-fs NAMENODE -ls /dir1</command>
       </test-commands>
       <cleanup-commands>
-        <command>-fs NAMENODE -rmr /dir1</command>
+        <command>-fs NAMENODE -rmr /dir1/dir2</command>
       </cleanup-commands>
       <comparators>
         <comparator>
@@ -104,7 +104,7 @@
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^drwxr-xr-x( )*-( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/dir1</expected-output>
+          <expected-output>^drwxr-xr-x( )*-( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/dir1/dir2</expected-output>
         </comparator>
       </comparators>
     </test>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar  4 03:41:31 2011
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -113,10 +114,10 @@ public class ClusterWithLinuxTaskControl
     String[] splits = ugi.split(",");
     taskControllerUser = new UnixUserGroupInformation(splits);
     clusterConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
-    createHomeDirectory(clusterConf);
+    createHomeAndStagingDirectory(clusterConf);
   }
 
-  private void createHomeDirectory(JobConf conf)
+  private void createHomeAndStagingDirectory(JobConf conf)
       throws IOException {
     FileSystem fs = dfsCluster.getFileSystem();
     String path = "/user/" + taskControllerUser.getUserName();
@@ -124,6 +125,12 @@ public class ClusterWithLinuxTaskControl
     LOG.info("Creating Home directory : " + homeDirectory);
     fs.mkdirs(homeDirectory);
     changePermission(conf, homeDirectory);
+    Path stagingArea = 
+      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+          "/tmp/hadoop/mapred/staging"));
+    LOG.info("Creating Staging root directory : " + stagingArea);
+    fs.mkdirs(stagingArea);
+    fs.setPermission(stagingArea, new FsPermission((short)0777));
   }
 
   private void changePermission(JobConf conf, Path p)

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java Fri Mar  4 03:41:31 2011
@@ -135,14 +135,14 @@ public class GenericMRLoadGenerator exte
       confRandom(job);
     } else if (null != job.getClass("mapred.indirect.input.format", null)) {
       // specified IndirectInputFormat? Build src list
-      JobClient jClient = new JobClient(job);  
-      Path sysdir = jClient.getSystemDir();
+      JobClient jClient = new JobClient(job);
+      Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
       Random r = new Random();
-      Path indirInputFile = new Path(sysdir,
+      Path indirInputFile = new Path(tmpDir,
           Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
       job.set("mapred.indirect.input.file", indirInputFile.toString());
       SequenceFile.Writer writer = SequenceFile.createWriter(
-          sysdir.getFileSystem(job), job, indirInputFile,
+          tmpDir.getFileSystem(job), job, indirInputFile,
           LongWritable.class, Text.class,
           SequenceFile.CompressionType.NONE);
       try {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Mar  4 03:41:31 2011
@@ -107,7 +107,7 @@ public class TestJobHistory extends Test
     boolean isJobLaunched;
     boolean isJTRestarted;
 
-    TestListener(JobInfo job) {
+    TestListener(JobHistory.JobInfo job) {
       super(job);
       lineNum = 0;
       isJobLaunched = false;
@@ -293,7 +293,7 @@ public class TestJobHistory extends Test
   }
 
   // Validate Format of Task Level Keys, Values read from history file
-  private static void validateTaskLevelKeyValuesFormat(JobInfo job,
+  private static void validateTaskLevelKeyValuesFormat(JobHistory.JobInfo job,
                                   boolean splitsCanBeEmpty) {
     Map<String, JobHistory.Task> tasks = job.getAllTasks();
 
@@ -340,7 +340,7 @@ public class TestJobHistory extends Test
   }
 
   // Validate foramt of Task Attempt Level Keys, Values read from history file
-  private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
+  private static void validateTaskAttemptLevelKeyValuesFormat(JobHistory.JobInfo job) {
     Map<String, JobHistory.Task> tasks = job.getAllTasks();
 
     // For each task
@@ -515,7 +515,7 @@ public class TestJobHistory extends Test
   // Validate Job Level Keys, Values read from history file by
   // comparing them with the actual values from JT.
   private static void validateJobLevelKeyValues(MiniMRCluster mr,
-          RunningJob job, JobInfo jobInfo, JobConf conf) throws IOException  {
+          RunningJob job, JobHistory.JobInfo jobInfo, JobConf conf) throws IOException  {
 
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
@@ -543,11 +543,11 @@ public class TestJobHistory extends Test
                values.get(Keys.JOB_PRIORITY)));
 
     assertTrue("Job Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getJobName(conf).equals(
+               "match the expected value", JobHistory.JobInfo.getJobName(conf).equals(
                values.get(Keys.JOBNAME)));
 
     assertTrue("User Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getUserName(conf).equals(
+               "match the expected value", JobHistory.JobInfo.getUserName(conf).equals(
                values.get(Keys.USER)));
 
     // Validate job counters
@@ -594,7 +594,7 @@ public class TestJobHistory extends Test
   // Validate Task Level Keys, Values read from history file by
   // comparing them with the actual values from JT.
   private static void validateTaskLevelKeyValues(MiniMRCluster mr,
-                      RunningJob job, JobInfo jobInfo) throws IOException  {
+                      RunningJob job, JobHistory.JobInfo jobInfo) throws IOException  {
 
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
@@ -676,7 +676,7 @@ public class TestJobHistory extends Test
   // Validate Task Attempt Level Keys, Values read from history file by
   // comparing them with the actual values from JT.
   private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr,
-                      RunningJob job, JobInfo jobInfo) throws IOException  {
+                      RunningJob job, JobHistory.JobInfo jobInfo) throws IOException  {
 
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java Fri Mar  4 03:41:31 2011
@@ -37,9 +37,9 @@ public class TestJobHistoryParsing  exte
    * object with data from log file. 
    */
   static class TestListener implements Listener {
-    JobInfo job;
+    JobHistory.JobInfo job;
 
-    TestListener(JobInfo job) {
+    TestListener(JobHistory.JobInfo job) {
       this.job = job;
     }
     // JobHistory.Listener implementation 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar  4 03:41:31 2011
@@ -28,6 +28,7 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 public class TestJobQueueTaskScheduler extends TestCase {
   
@@ -77,8 +78,8 @@ public class TestJobQueueTaskScheduler e
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1, 
-                              getJobConf().getUser()) {
+      Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(),
+          1) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -93,8 +94,7 @@ public class TestJobQueueTaskScheduler e
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10, 1, 
-                                 getJobConf().getUser()) {
+      Task task = new ReduceTask("", attemptId, 0, 10, 1) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Fri Mar  4 03:41:31 2011
@@ -55,7 +55,8 @@ public class TestJobSysDirWithDFS extend
                                            Path outDir,
                                            String input,
                                            int numMaps,
-                                           int numReduces) throws IOException {
+                                           int numReduces,
+                                           String sysDir) throws IOException {
     FileSystem inFs = inDir.getFileSystem(conf);
     FileSystem outFs = outDir.getFileSystem(conf);
     outFs.delete(outDir, true);
@@ -88,14 +89,15 @@ public class TestJobSysDirWithDFS extend
     // Checking that the Job Client system dir is not used
     assertFalse(FileSystem.get(conf).exists(new Path(conf.get("mapred.system.dir")))); 
     // Check if the Job Tracker system dir is propogated to client
-    String sysDir = jobClient.getSystemDir().toString();
+    sysDir = jobClient.getSystemDir().toString();
     System.out.println("Job sys dir -->" + sysDir);
     assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
     assertTrue(sysDir.contains("custom"));
     return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
   }
 
- static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+ static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir)
+ throws IOException {
     LOG.info("runWordCount");
     // Run a word count example
     // Keeping tasks that match this pattern
@@ -105,7 +107,7 @@ public class TestJobSysDirWithDFS extend
     result = launchWordCount(jobConf, inDir, outDir,
                              "The quick brown fox\nhas many silly\n" + 
                              "red fox sox\n",
-                             3, 1);
+                             3, 1, sysDir);
     assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                  "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
     // Checking if the Job ran successfully in spite of different system dir config
@@ -126,7 +128,7 @@ public class TestJobSysDirWithDFS extend
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
 
-      runWordCount(mr, mr.createJobConf());
+      runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar  4 03:41:31 2011
@@ -27,12 +27,15 @@ import junit.framework.TestCase;
 import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
-
+import org.junit.*;
 /** 
  * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker 
  * should be able to continue running the previously running jobs and also 
  * recover previosuly submitted jobs.
  */
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
 public class TestJobTrackerRestart extends TestCase {
   static final Path testDir = 
     new Path(System.getProperty("test.build.data","/tmp"), 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Fri Mar  4 03:41:31 2011
@@ -24,11 +24,15 @@ import org.apache.hadoop.mapred.TestJobT
 
 import junit.framework.TestCase;
 import java.io.*;
+import org.junit.*;
 
 /** 
  * This test checks if the jobtracker can detect and recover a tracker that was
  * lost while the jobtracker was down.
  */
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
 public class TestJobTrackerRestartWithLostTracker extends TestCase {
   final Path testDir = new Path("/jt-restart-lost-tt-testing");
   final Path inDir = new Path(testDir, "input");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java Fri Mar  4 03:41:31 2011
@@ -27,12 +27,16 @@ import junit.framework.TestCase;
 import java.io.*;
 import java.util.HashSet;
 import java.util.Set;
+import org.junit.*;
 
 /** 
  * This test checks jobtracker in safe mode. In safe mode the jobtracker upon 
  * restart doesnt schedule any new tasks and waits for the (old) trackers to 
  * join back.
  */
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
 public class TestJobTrackerSafeMode extends TestCase {
   final Path testDir = 
     new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Fri Mar  4 03:41:31 2011
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
+import java.net.URI;
+
 import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -35,15 +37,13 @@ import org.apache.hadoop.io.Text;
 public class TestMiniMRClasspath extends TestCase {
   
   
-  static String launchWordCount(String fileSys,
+  static void configureWordCount(FileSystem fs,
                                 String jobTracker,
                                 JobConf conf,
                                 String input,
                                 int numMaps,
-                                int numReduces) throws IOException {
-    final Path inDir = new Path("/testing/wc/input");
-    final Path outDir = new Path("/testing/wc/output");
-    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+                                int numReduces,
+                                Path inDir, Path outDir) throws IOException {
     fs.delete(outDir, true);
     if (!fs.mkdirs(inDir)) {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
@@ -53,7 +53,7 @@ public class TestMiniMRClasspath extends
       file.writeBytes(input);
       file.close();
     }
-    FileSystem.setDefaultUri(conf, fileSys);
+    FileSystem.setDefaultUri(conf, fs.getUri());
     conf.set("mapred.job.tracker", jobTracker);
     conf.setJobName("wordcount");
     conf.setInputFormat(TextInputFormat.class);
@@ -72,6 +72,16 @@ public class TestMiniMRClasspath extends
     conf.setNumReduceTasks(numReduces);
     //pass a job.jar already included in the hadoop build
     conf.setJar("build/test/testjar/testjob.jar");
+  }
+
+  static String launchWordCount(String fileSys, String jobTracker, JobConf conf,
+                                String input, int numMaps, int numReduces)
+  throws IOException {
+    final Path inDir = new Path("/testing/wc/input");
+    final Path outDir = new Path("/testing/wc/output");
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    configureWordCount(fs, jobTracker, conf, input, numMaps, numReduces, inDir,
+                       outDir);
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar  4 03:41:31 2011
@@ -210,8 +210,10 @@ public class TestMiniMRWithDFS extends T
     long hdfsWrite = 
       counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
           Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    long rawSplitBytesRead =
+      counters.findCounter(Task.Counter.SPLIT_RAW_BYTES).getCounter();
     assertEquals(result.output.length(), hdfsWrite);
-    assertEquals(input.length(), hdfsRead);
+    assertEquals(input.length() + rawSplitBytesRead, hdfsRead);
 
     // Run a job with input and output going to localfs even though the 
     // default fs is hdfs.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Fri Mar  4 03:41:31 2011
@@ -23,6 +23,8 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -43,8 +45,10 @@ public class TestMiniMRWithDFSWithDistin
   }
   
   static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
-    JobConf jobconf = mr.createJobConf();
-    UnixUserGroupInformation.saveToConf(jobconf,
+    return createJobConf(mr.createJobConf(), ugi);
+  }
+  static JobConf createJobConf(JobConf conf, UnixUserGroupInformation ugi) {
+    JobConf jobconf = new JobConf(conf);    UnixUserGroupInformation.saveToConf(jobconf,
         UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
     return jobconf;
   }
@@ -55,6 +59,50 @@ public class TestMiniMRWithDFSWithDistin
     fs.setPermission(p, new FsPermission((short)0777));
   }
 
+  // runs a sample job as a user (ugi)
+  RunningJob runJobAsUser(JobConf job, UserGroupInformation ugi)
+  throws Exception {
+    JobSubmissionProtocol jobSubmitClient =
+      TestSubmitJob.getJobSubmitClient(job, ugi);
+    JobID id = jobSubmitClient.getNewJobId();
+   
+    InputSplit[] splits = computeJobSplit(JobID.downgrade(id), job);
+    Path jobSubmitDir = new Path(id.toString());
+    FileSystem fs = jobSubmitDir.getFileSystem(job);
+    jobSubmitDir = jobSubmitDir.makeQualified(fs);
+    uploadJobFiles(JobID.downgrade(id), splits, jobSubmitDir, job);
+   
+    jobSubmitClient.submitJob(id, jobSubmitDir.toString());
+   
+    JobClient jc = new JobClient(job);
+    return jc.getJob(JobID.downgrade(id));
+  }
+ 
+  // a helper api for split computation
+  private InputSplit[] computeJobSplit(JobID id, JobConf conf)
+  throws IOException {
+    InputSplit[] splits =
+      conf.getInputFormat().getSplits(conf, conf.getNumMapTasks());
+    conf.setNumMapTasks(splits.length);
+    return splits;
+  }
+
+
+  // a helper api for split submission
+  private void uploadJobFiles(JobID id, InputSplit[] splits,
+                             Path jobSubmitDir, JobConf conf)
+  throws IOException {
+    Path confLocation = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, splits);
+    FileSystem fs = confLocation.getFileSystem(conf);
+    FsPermission perm = new FsPermission((short)0700);
+   
+    // localize conf
+    DataOutputStream confOut = FileSystem.create(fs, confLocation, perm);
+    conf.writeXml(confOut);
+    confOut.close();
+  }
+ 
   public void testDistinctUsers() throws Exception {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
@@ -72,9 +120,21 @@ public class TestMiniMRWithDFSWithDistin
       mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
            1, null, null, MR_UGI);
 
-      JobConf pi = createJobConf(mr, PI_UGI);
-      TestMiniMRWithDFS.runPI(mr, pi);
-
+      String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+      JobConf job1 = mr.createJobConf();
+      String input = "The quick brown fox\nhas many silly\n"
+                     + "red fox sox\n";
+      Path inDir = new Path("/testing/distinct/input");
+      Path outDir = new Path("/testing/distinct/output");
+      TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1,
+                                             input, 2, 1, inDir, outDir);
+      JobConf job2 = mr.createJobConf();
+      Path inDir2 = new Path("/testing/distinct/input2");
+      Path outDir2 = new Path("/testing/distinct/output2");
+      TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job2,
+                                             input, 2, 1, inDir2, outDir2);
+      job2 = createJobConf(job2, WC_UGI);
+      runJobAsUser(job2, WC_UGI);
       JobConf wc = createJobConf(mr, WC_UGI);
       TestMiniMRWithDFS.runWordCount(mr, wc);
     } finally {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java Fri Mar  4 03:41:31 2011
@@ -387,82 +387,4 @@ public class TestNodeRefresh extends Tes
     
     stopCluster();
   }
-
-  /** 
-    * Check if excluded hosts are decommissioned across restart   
-    */ 
-   public void testMRExcludeHostsAcrossRestarts() throws IOException { 
-     // start a cluster with 2 hosts and empty exclude-hosts file 
-     Configuration conf = new Configuration(); 
-     conf.setBoolean("mapred.jobtracker.restart.recover", true); 
-  
-     File file = new File("hosts.exclude"); 
-     file.delete(); 
-     startCluster(2, 1, 0, conf); 
-     String hostToDecommission = getHostname(1); 
-     conf = mr.createJobConf(new JobConf(conf)); 
-  
-     // submit a job 
-     Path inDir = new Path("input"); 
-     Path outDir = new Path("output"); 
-     Path signalFilename = new Path("share"); 
-     JobConf newConf = new JobConf(conf); 
-     UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1,  
-         "restart-decommission", signalFilename.toString(),  
-         signalFilename.toString()); 
-      
-     JobClient jobClient = new JobClient(newConf); 
-     RunningJob job = jobClient.submitJob(newConf); 
-     JobID id = job.getID(); 
-      
-     // wait for 50% 
-     while (job.mapProgress() < 0.5f) { 
-       UtilsForTests.waitFor(100); 
-     } 
-      
-     // change the exclude-hosts file to include one host 
-     FileOutputStream out = new FileOutputStream(file); 
-     LOG.info("Writing excluded nodes to log file " + file.toString()); 
-     BufferedWriter writer = null; 
-     try { 
-       writer = new BufferedWriter(new OutputStreamWriter(out)); 
-       writer.write( hostToDecommission + "\n"); // decommission first host 
-     } finally { 
-       if (writer != null) { 
-         writer.close(); 
-       } 
-       out.close(); 
-     } 
-     file.deleteOnExit(); 
-  
-     // restart the jobtracker 
-     mr.stopJobTracker(); 
-     mr.startJobTracker(); 
-     // Wait for the JT to be ready 
-     UtilsForTests.waitForJobTracker(jobClient); 
-  
-     jt = mr.getJobTrackerRunner().getJobTracker(); 
-     UtilsForTests.signalTasks(dfs, dfs.getFileSystem(),  
-         signalFilename.toString(), signalFilename.toString(), 1); 
-  
-     assertTrue("Decommissioning of tracker has no effect restarted job",  
-         jt.getJob(job.getID()).failedMapTasks > 0); 
-      
-     // check the cluster status and tracker size 
-     assertEquals("Tracker is not lost upon host decommissioning",  
-                  1, jt.getClusterStatus(false).getTaskTrackers()); 
-     assertEquals("Excluded node count is incorrect",  
-                  1, jt.getClusterStatus(false).getNumExcludedNodes()); 
-      
-     // check if the host is disallowed 
-     for (TaskTrackerStatus status : jt.taskTrackers()) { 
-       assertFalse("Tracker from decommissioned host still exist",  
-                   status.getHost().equals(hostToDecommission)); 
-     } 
-  
-     // wait for the job 
-     job.waitForCompletion(); 
-  
-     stopCluster(); 
-   } 
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Mar  4 03:41:31 2011
@@ -480,7 +480,10 @@ public class TestQueueManager extends Te
 
       //try to kill as self
       try {
-        rjob.killJob();
+        conf.set("mapred.job.tracker", "localhost:"
+            + miniMRCluster.getJobTrackerPort());
+        JobClient jc = new JobClient(conf);
+        jc.getJob(rjob.getJobID()).killJob();
         if (!shouldSucceed) {
           fail("should fail kill operation");  
         }
@@ -519,7 +522,10 @@ public class TestQueueManager extends Te
       
       // try to change priority as self
       try {
-        rjob.setJobPriority("VERY_LOW");
+        conf.set("mapred.job.tracker", "localhost:"
+            + miniMRCluster.getJobTrackerPort());
+        JobClient jc = new JobClient(conf);
+        jc.getJob(rjob.getJobID()).setJobPriority("VERY_LOW");
         if (!shouldSucceed) {
           fail("changing priority should fail.");
         }
@@ -546,6 +552,9 @@ public class TestQueueManager extends Te
   private void setUpCluster(JobConf conf) throws IOException {
     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = miniDFSCluster.getFileSystem();
+    TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys,
+        conf.get("mapreduce.jobtracker.staging.root.dir",
+            "/tmp/hadoop/mapred/staging"));
     String namenode = fileSys.getUri().toString();
     miniMRCluster = new MiniMRCluster(1, namenode, 3, 
                       null, null, conf);
@@ -596,7 +605,7 @@ public class TestQueueManager extends Te
     if (shouldComplete) {
       rJob = JobClient.runJob(jc);  
     } else {
-      rJob = new JobClient(clientConf).submitJob(jc);
+      rJob = new JobClient(jc).submitJob(jc);
     }
     return rJob;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Fri Mar  4 03:41:31 2011
@@ -29,16 +29,19 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
 import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
 import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.*;
 
 /**
  * Test whether the {@link RecoveryManager} is able to tolerate job-recovery 
  * failures and the jobtracker is able to tolerate {@link RecoveryManager}
  * failure.
  */
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
 public class TestRecoveryManager extends TestCase {
   private static final Log LOG = 
     LogFactory.getLog(TestRecoveryManager.class);
@@ -51,8 +54,7 @@ public class TestRecoveryManager extends
    * {@link JobTracker.RecoveryManager}. It does the following :
    *  - submits 2 jobs
    *  - kills the jobtracker
-   *  - Garble job.xml for one job causing it to fail in constructor 
-   *    and job.split for another causing it to fail in init.
+   *  - deletes the info file for one job
    *  - restarts the jobtracker
    *  - checks if the jobtraker starts normally
    */
@@ -106,26 +108,15 @@ public class TestRecoveryManager extends
     
     // delete the job.xml of job #1 causing the job to fail in constructor
     Path jobFile = 
-      new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
-    LOG.info("Deleting job.xml file : " + jobFile.toString());
+      new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
+    LOG.info("Deleting job token file : " + jobFile.toString());
     fs.delete(jobFile, false); // delete the job.xml file
     
-    // create the job.xml file with 0 bytes
+    // create the job.xml file with 1 bytes
     FSDataOutputStream out = fs.create(jobFile);
     out.write(1);
     out.close();
 
-    // delete the job.split of job #2 causing the job to fail in initTasks
-    Path jobSplitFile = 
-      new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
-    LOG.info("Deleting job.split file : " + jobSplitFile.toString());
-    fs.delete(jobSplitFile, false); // delete the job.split file
-    
-    // create the job.split file with 0 bytes
-    out = fs.create(jobSplitFile);
-    out.write(1);
-    out.close();
-
     // make sure that the jobtracker is in recovery mode
     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
                                       true);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Fri Mar  4 03:41:31 2011
@@ -18,7 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import junit.framework.TestCase;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 public class TestResourceEstimation extends TestCase {
   
@@ -45,8 +45,8 @@ public class TestResourceEstimation exte
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      RawSplit split = new RawSplit();
-      split.setDataLength(0);
+      JobSplit.TaskSplitMetaInfo split =
+          new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
       TaskInProgress tip = 
         new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
@@ -82,8 +82,9 @@ public class TestResourceEstimation exte
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      RawSplit split = new RawSplit();
-      split.setDataLength(singleMapInputSize);
+      JobSplit.TaskSplitMetaInfo split =
+              new JobSplit.TaskSplitMetaInfo(new String[0], 0,
+                                           singleMapInputSize);
       TaskInProgress tip = 
         new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
@@ -95,8 +96,8 @@ public class TestResourceEstimation exte
     //add one more map task with input size as 0
     TaskStatus ts = new MapTaskStatus();
     ts.setOutputSize(singleMapOutputSize);
-    RawSplit split = new RawSplit();
-    split.setDataLength(0);
+    JobSplit.TaskSplitMetaInfo split =
+        new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
     TaskInProgress tip = 
       new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
     re.updateWithCompletedTask(ts, tip);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java Fri Mar  4 03:41:31 2011
@@ -18,24 +18,58 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.net.URI;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ToolRunner;
 
 import junit.framework.TestCase;
 
 public class TestSubmitJob extends TestCase {
-  private MiniMRCluster miniMRCluster;
+  static final Log LOG = LogFactory.getLog(TestSubmitJob.class);
+  private MiniMRCluster mrCluster;
 
-  @Override
-  protected void tearDown()
-      throws Exception {
-    if (miniMRCluster != null) {
-      miniMRCluster.shutdown();
-    }
+  private MiniDFSCluster dfsCluster;
+  private JobTracker jt;
+  private FileSystem fs;
+  private static Path TEST_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"),
+             "job-submission-testing");
+  private static int numSlaves = 1;
+
+  private void startCluster() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    JobConf jConf = new JobConf(conf);
+    jConf.setLong("mapred.job.submission.expiry.interval", 6 * 1000);
+    mrCluster = new MiniMRCluster(0, 0, numSlaves,
+        dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
+        jConf);
+    jt = mrCluster.getJobTrackerRunner().getJobTracker();
+    fs = FileSystem.get(mrCluster.createJobConf());
   }
 
+  private void stopCluster() throws Exception {
+    mrCluster.shutdown();
+    mrCluster = null;
+    dfsCluster.shutdown();
+    dfsCluster = null;
+    jt = null;
+    fs = null;
+  }
   /**
    * Test to verify that jobs with invalid memory requirements are killed at the
    * JT.
@@ -54,9 +88,9 @@ public class TestSubmitJob extends TestC
     jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
         4 * 1024L);
 
-    miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
+    mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
 
-    JobConf clusterConf = miniMRCluster.createJobConf();
+    JobConf clusterConf = mrCluster.createJobConf();
 
     // No map-memory configuration
     JobConf jobConf = new JobConf(clusterConf);
@@ -83,6 +117,9 @@ public class TestSubmitJob extends TestC
     jobConf.setMemoryForReduceTask(5 * 1024L);
     runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
         "Exceeds the cluster's max-memory-limit.");
+    
+    mrCluster.shutdown();
+    mrCluster = null;
   }
 
   private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
@@ -108,4 +145,126 @@ public class TestSubmitJob extends TestC
         + " - doesn't contain expected message - " + overallExpectedMsg, msg
         .contains(overallExpectedMsg));
   }
-}
+  static JobSubmissionProtocol getJobSubmitClient(JobConf conf,
+                                            UserGroupInformation ugi)
+   throws IOException {
+    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+        JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), ugi,
+         conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+   }
+ 
+  static org.apache.hadoop.hdfs.protocol.ClientProtocol getDFSClient(
+        Configuration conf, UserGroupInformation ugi)
+    throws IOException {
+     return (org.apache.hadoop.hdfs.protocol.ClientProtocol)
+        RPC.getProxy(org.apache.hadoop.hdfs.protocol.ClientProtocol.class,
+           org.apache.hadoop.hdfs.protocol.ClientProtocol.versionID,
+           NameNode.getAddress(conf), ugi,
+           conf,
+           NetUtils.getSocketFactory(conf,
+               org.apache.hadoop.hdfs.protocol.ClientProtocol.class));
+  }
+ 
+   /**
+    * Submit a job and check if the files are accessible to other users.
+    */
+  public void testSecureJobExecution() throws Exception {
+    LOG.info("Testing secure job submission/execution");
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      UnixUserGroupInformation.saveToConf(conf,
+          UnixUserGroupInformation.UGI_PROPERTY_NAME,
+          TestMiniMRWithDFSWithDistinctUsers.DFS_UGI);
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fs = dfs.getFileSystem();
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs,
+          conf.get("mapreduce.jobtracker.staging.root.dir",
+              "/tmp/hadoop/mapred/staging"));
+      UnixUserGroupInformation MR_UGI =
+        TestMiniMRWithDFSWithDistinctUsers.createUGI(
+            UnixUserGroupInformation.login().getUserName(), false);
+      mr = new MiniMRCluster(0, 0, 1, dfs.getFileSystem().getUri().toString(),
+          1, null, null, MR_UGI);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+      // cleanup
+      dfs.getFileSystem().delete(TEST_DIR, true);
+
+      final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
+      final Path reduceSignalFile = new Path(TEST_DIR, "reduce-signal");
+
+      // create a ugi for user 1
+      UnixUserGroupInformation user1 =
+        TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
+      Path inDir = new Path("/user/input");
+      Path outDir = new Path("/user/output");
+      JobConf job =
+        TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user1);
+
+      UtilsForTests.configureWaitingJobConf(job, inDir, outDir, 2, 0,
+          "test-submit-job", mapSignalFile.toString(),
+          reduceSignalFile.toString());
+      job.set(UtilsForTests.getTaskSignalParameter(true),
+          mapSignalFile.toString());
+      job.set(UtilsForTests.getTaskSignalParameter(false),
+          reduceSignalFile.toString());
+      LOG.info("Submit job as the actual user (" + user1.getUserName() + ")");
+      JobClient jClient = new JobClient(job);
+      RunningJob rJob = jClient.submitJob(job);
+      JobID id = rJob.getID();
+      LOG.info("Running job " + id);
+
+      // create user2
+      UnixUserGroupInformation user2 =
+        TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false);
+      JobConf conf_other =
+        TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user2);
+      org.apache.hadoop.hdfs.protocol.ClientProtocol client =
+        getDFSClient(conf_other, user2);
+
+      // try accessing mapred.system.dir/jobid/*
+      boolean failed = false;
+      try {
+        Path path = new Path(new URI(jt.getSystemDir()).getPath());
+        LOG.info("Try listing the mapred-system-dir as the user ("
+            + user2.getUserName() + ")");
+        client.getListing(path.toString());
+      } catch (IOException ioe) {
+        failed = true;
+      }
+      assertTrue("JobTracker system dir is accessible to others", failed);
+      // try accessing ~/.staging/jobid/*
+      failed = false;
+      JobInProgress jip = jt.getJob(id);
+      Path jobSubmitDirpath =
+        new Path(jip.getJobConf().get("mapreduce.job.dir"));
+      try {
+        LOG.info("Try accessing the job folder for job " + id + " as the user ("
+            + user2.getUserName() + ")");
+        client.getListing(jobSubmitDirpath.toString());
+      } catch (IOException ioe) {
+        failed = true;
+      }
+      assertTrue("User's staging folder is accessible to others", failed);
+      UtilsForTests.signalTasks(dfs, fs, true, mapSignalFile.toString(),
+          reduceSignalFile.toString());
+      // wait for job to be done
+      UtilsForTests.waitTillDone(jClient);
+
+      // check if the staging area is cleaned up
+      LOG.info("Check if job submit dir is cleanup or not");
+      assertFalse(fs.exists(jobSubmitDirpath));
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java Fri Mar  4 03:41:31 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 import org.junit.After;
 import org.junit.Test;
@@ -140,7 +141,8 @@ public class TestTaskLogsMonitor {
     int taskcount = 0;
 
     TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
-    Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+                            0);
 
     // Let the tasks write logs within retain-size
     writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 500, 'H');
@@ -181,7 +183,8 @@ public class TestTaskLogsMonitor {
     int taskcount = 0;
 
     TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
-    Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+                            0);
 
     // Let the tasks write some logs
     writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
@@ -218,7 +221,8 @@ public class TestTaskLogsMonitor {
     int taskcount = 0;
 
     TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
-    Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(), 
+                            0);
 
     // Let the tasks write logs more than retain-size
     writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
@@ -259,7 +263,8 @@ public class TestTaskLogsMonitor {
 
     // Assuming the job's retain size is 150
     TaskAttemptID attempt1 = new TaskAttemptID(baseTaskID, attemptsCount++);
-    Task task1 = new MapTask(null, attempt1, 0, null, null, 0, null);
+    Task task1 = new MapTask(null, attempt1, 0, new JobSplit.TaskSplitIndex(),
+                             0);
 
     // Let the tasks write logs more than retain-size
     writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
@@ -271,7 +276,8 @@ public class TestTaskLogsMonitor {
 
     // Start another attempt in the same JVM
     TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
-    Task task2 = new MapTask(null, attempt2, 0, null, null, 0, null);
+    Task task2 = new MapTask(null, attempt2, 0, new JobSplit.TaskSplitIndex(),
+                             0);
     logsMonitor.monitorTaskLogs();
 
     // Let attempt2 also write some logs
@@ -280,7 +286,8 @@ public class TestTaskLogsMonitor {
 
     // Start yet another attempt in the same JVM
     TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
-    Task task3 = new MapTask(null, attempt3, 0, null, null, 0, null);
+    Task task3 = new MapTask(null, attempt3, 0, new JobSplit.TaskSplitIndex(),
+                             0);
     logsMonitor.monitorTaskLogs();
 
     // Let attempt3 also write some logs

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar  4 03:41:31 2011
@@ -254,7 +254,9 @@ public class UtilsForTests {
     while (true) {
       boolean shouldWait = false;
       for (JobStatus jobStatuses : jobClient.getAllJobs()) {
-        if (jobStatuses.getRunState() == JobStatus.RUNNING) {
+        if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
+            && jobStatuses.getRunState() != JobStatus.FAILED
+            && jobStatuses.getRunState() != JobStatus.KILLED) {
           shouldWait = true;
           break;
         }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java Fri Mar  4 03:41:31 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -423,7 +424,13 @@ public class DistCh extends DistTool {
   private boolean setup(List<FileOperation> ops, Path log) throws IOException {
     final String randomId = getRandomId();
     JobClient jClient = new JobClient(jobconf);
-    Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+    Path stagingArea;
+    stagingArea = JobSubmissionFiles.getStagingDir(
+                     jClient, jobconf);
+    Path jobdir = new Path(stagingArea + NAME + "_" + randomId);
+    FsPermission mapredSysPerms =
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms);
     LOG.info(JOB_DIR_LABEL + "=" + jobdir);
 
     if (log == null) {