You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/12/21 18:36:48 UTC

svn commit: r892893 [2/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src...

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+/**
+ * A utility to manage job submission files.
+ */
+@InterfaceAudience.Private
+public class JobSubmissionFiles {
+
+  // job submission directory is private!
+  final public static FsPermission JOB_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0700); // rwx--------
+  //job files are world-wide readable and owner writable
+  final public static FsPermission JOB_FILE_PERMISSION = 
+    FsPermission.createImmutable((short) 0644); // rw-r--r--
+  
+  public static Path getJobSplitFile(Path jobSubmissionDir) {
+    return new Path(jobSubmissionDir, "job.split");
+  }
+
+  public static Path getJobSplitMetaFile(Path jobSubmissionDir) {
+    return new Path(jobSubmissionDir, "job.splitmetainfo");
+  }
+  
+  /**
+   * Get the job conf path.
+   */
+  public static Path getJobConfPath(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "job.xml");
+  }
+    
+  /**
+   * Get the job jar path.
+   */
+  public static Path getJobJar(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "job.jar");
+  }
+  
+  /**
+   * Get the job distributed cache files path.
+   * @param jobSubmitDir
+   */
+  public static Path getJobDistCacheFiles(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "files");
+  }
+  /**
+   * Get the job distributed cache archives path.
+   * @param jobSubmitDir 
+   */
+  public static Path getJobDistCacheArchives(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "archives");
+  }
+  /**
+   * Get the job distributed cache libjars path.
+   * @param jobSubmitDir 
+   */
+  public static Path getJobDistCacheLibjars(Path jobSubmitDir) {
+    return new Path(jobSubmitDir, "libjars");
+  }
+
+  /**
+   * Initializes the staging directory and returns the path. It also
+   * keeps track of all necessary ownership & permissions
+   * @param cluster
+   * @param conf
+   */
+  public static Path getStagingDir(Cluster cluster, Configuration conf) 
+  throws IOException, InterruptedException {
+    Path stagingArea = cluster.getStagingAreaDir();
+    FileSystem fs = stagingArea.getFileSystem(conf);
+    String realUser;
+    String currentUser;
+    try {
+      UserGroupInformation ugi = UnixUserGroupInformation.login();
+      realUser = ugi.getUserName();
+      ugi = UnixUserGroupInformation.login(conf);
+      currentUser = ugi.getUserName();
+    } catch (LoginException le) {
+      throw new IOException(le);
+    }
+    if (fs.exists(stagingArea)) {
+      FileStatus fsStatus = fs.getFileStatus(stagingArea);
+      String owner = fsStatus.getOwner();
+      if (!(owner.equals(currentUser) || owner.equals(realUser)) || 
+          !fsStatus.getPermission().
+                               equals(JOB_DIR_PERMISSION)) {
+         throw new IOException("The ownership/permissions on the staging " +
+                      "directory " + stagingArea + " is not as expected. " + 
+                      "It is owned by " + owner + " and permissions are "+ 
+                      fsStatus.getPermission() + ". The directory must " +
+                      "be owned by the submitter " + currentUser + " or " +
+                      "by " + realUser + " and permissions must be rwx------");
+      }
+    } else {
+      fs.mkdirs(stagingArea, 
+          new FsPermission(JOB_DIR_PERMISSION));
+    }
+    return stagingArea;
+  }
+  
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Mon Dec 21 17:36:44 2009
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
@@ -35,14 +34,11 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 class JobSubmitter {
@@ -128,12 +124,7 @@
     String files = conf.get("tmpfiles");
     String libjars = conf.get("tmpjars");
     String archives = conf.get("tmparchives");
-      
-    /*
-     * set this user's id in job configuration, so later job files can be
-     * accessed using this user's id
-     */
-    job.setUGIAndUserGroupNames();
+    String jobJar = job.getJar();
 
     //
     // Figure out what fs the JobTracker is using.  Copy the
@@ -145,14 +136,18 @@
 
     // Create a number of filenames in the JobTracker's fs namespace
     LOG.debug("default FileSystem: " + jtFs.getUri());
-    jtFs.delete(submitJobDir, true);
+    if (jtFs.exists(submitJobDir)) {
+      throw new IOException("Not submitting job. Job directory " + submitJobDir
+          +" already exists!! This is unexpected.Please check what's there in" +
+          " that directory");
+    }
     submitJobDir = jtFs.makeQualified(submitJobDir);
     submitJobDir = new Path(submitJobDir.toUri().getPath());
-    FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
+    FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
     FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
-    Path filesDir = new Path(submitJobDir, "files");
-    Path archivesDir = new Path(submitJobDir, "archives");
-    Path libjarsDir = new Path(submitJobDir, "libjars");
+    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
+    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
+    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
     // add all the command line files/ jars and archive
     // first copy them to jobtrackers filesystem 
       
@@ -185,7 +180,8 @@
       for (String tmpjars: libjarsArr) {
         Path tmp = new Path(tmpjars);
         Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
-        DistributedCache.addFileToClassPath(newPath, conf);
+        DistributedCache.addFileToClassPath(
+            new Path(newPath.toUri().getPath()), conf);
       }
     }
       
@@ -212,11 +208,24 @@
         DistributedCache.createSymlink(conf);
       }
     }
-      
+
+    if (jobJar != null) {   // copy jar to JobTracker's fs
+      // use jar name if job is not named. 
+      if ("".equals(job.getJobName())){
+        job.setJobName(new Path(jobJar).getName());
+      }
+      copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir), 
+          replication);
+      job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+    } else {
+      LOG.warn("No job jar file set.  User classes may not be found. "+
+      "See Job or Job#setJar(String).");
+    }
+
     //  set the timestamps of the archives and files
     TrackerDistributedCacheManager.determineTimestamps(conf);
   }
-
+  
   private URI getPathURI(Path destPath, String fragment) 
       throws URISyntaxException {
     URI pathURI = destPath.toUri();
@@ -234,36 +243,20 @@
       short replication) throws IOException {
     jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
     jtFs.setReplication(submitJarFile, replication);
-    jtFs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
+    jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
   }
+  
   /**
    * configure the jobconf of the user with the command line options of 
    * -libjars, -files, -archives.
    * @param conf
    * @throws IOException
    */
-  private void configureCommandLineOptions(Job job, Path submitJobDir,
-      Path submitJarFile) throws IOException {
+  private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 
+  throws IOException {
     Configuration conf = job.getConfiguration();
     short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
-    copyAndConfigureFiles(job, submitJobDir, replication);
-    
-    /* set this user's id in job configuration, so later job files can be
-     * accessed using this user's id
-     */
-    String originalJarPath = job.getJar();
-
-    if (originalJarPath != null) {           // copy jar to JobTracker's fs
-      // use jar name if job is not named. 
-      if ("".equals(job.getJobName())){
-        job.setJobName(new Path(originalJarPath).getName());
-      }
-      job.setJar(submitJarFile.toString());
-      copyJar(new Path(originalJarPath), submitJarFile, replication);
-    } else {
-      LOG.warn("No job jar file set.  User classes may not be found. "+
-               "See Job or Job#setJar(String).");
-    }
+    copyAndConfigureFiles(job, jobSubmitDir, replication);
 
     // Set the working directory
     if (job.getWorkingDirectory() == null) {
@@ -271,15 +264,6 @@
     }
 
   }
-
-  // job files are world-wide readable and owner writable
-  final private static FsPermission JOB_FILE_PERMISSION = 
-    FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  // job submission directory is world readable/writable/executable
-  final static FsPermission JOB_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
-   
   /**
    * Internal method for submitting jobs to the system.
    * 
@@ -305,45 +289,60 @@
    *   </li>
    * </ol></p>
    * @param job the configuration to submit
+   * @param cluster the handle to the Cluster
    * @throws ClassNotFoundException
    * @throws InterruptedException
    * @throws IOException
    */
-  JobStatus submitJobInternal(Job job) throws ClassNotFoundException,
-      InterruptedException, IOException {
-    
+  JobStatus submitJobInternal(Job job, Cluster cluster) 
+  throws ClassNotFoundException, InterruptedException, IOException {
+    /*
+     * set this user's id in job configuration, so later job files can be
+     * accessed using this user's id
+     */
+    job.setUGIAndUserGroupNames();
+
+    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
+                                                     job.getConfiguration());
     //configure the command line options correctly on the submitting dfs
     Configuration conf = job.getConfiguration();
     JobID jobId = submitClient.getNewJobID();
-    Path submitJobDir = new Path(submitClient.getSystemDir(), jobId.toString());
-    Path submitJarFile = new Path(submitJobDir, "job.jar");
-    Path submitSplitFile = new Path(submitJobDir, "job.split");
-    configureCommandLineOptions(job, submitJobDir, submitJarFile);
-    Path submitJobFile = new Path(submitJobDir, "job.xml");
-    
-    checkSpecs(job);
-
-    // Create the splits for the job
-    LOG.info("Creating splits at " + jtFs.makeQualified(submitSplitFile));
-    int maps = writeSplits(job, submitSplitFile);
-    conf.set("mapred.job.split.file", submitSplitFile.toString());
-    conf.setInt("mapred.map.tasks", maps);
-    LOG.info("number of splits:" + maps);
-    
-    // Write job file to JobTracker's fs
-    writeConf(conf, submitJobFile);
-    
-    //
-    // Now, actually submit the job (using the submit name)
-    //
-    JobStatus status = submitClient.submitJob(jobId);
-    if (status != null) {
-      return status;
-    } else {
-      throw new IOException("Could not launch job");
+    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+    JobStatus status = null;
+    try {
+      conf.set("mapreduce.job.dir", submitJobDir.toString());
+      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
+          + " as the submit dir");
+      copyAndConfigureFiles(job, submitJobDir);
+      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
+
+      checkSpecs(job);
+
+      // Create the splits for the job
+      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
+      int maps = writeSplits(job, submitJobDir);
+      conf.setInt("mapred.map.tasks", maps);
+      LOG.info("number of splits:" + maps);
+
+      // Write job file to submit dir
+      writeConf(conf, submitJobFile);
+      //
+      // Now, actually submit the job (using the submit name)
+      //
+      status = submitClient.submitJob(jobId, submitJobDir.toString());
+      if (status != null) {
+        return status;
+      } else {
+        throw new IOException("Could not launch job");
+      }
+    } finally {
+      if (status == null) {
+        LOG.info("Cleaning up the staging area " + submitJobDir);
+        jtFs.delete(submitJobDir, true);
+      }
     }
   }
-
+  
   private void checkSpecs(Job job) throws ClassNotFoundException, 
       InterruptedException, IOException {
     JobConf jConf = (JobConf)job.getConfiguration();
@@ -364,7 +363,7 @@
     // Write job file to JobTracker's fs        
     FSDataOutputStream out = 
       FileSystem.create(jtFs, jobFile, 
-                        new FsPermission(JOB_FILE_PERMISSION));
+                        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
     try {
       conf.writeXml(out);
     } finally {
@@ -372,81 +371,42 @@
     }
   }
   
+
   @SuppressWarnings("unchecked")
-  private <T extends InputSplit> 
-  int writeNewSplits(JobContext job, Path submitSplitFile) throws IOException,
+  private <T extends InputSplit>
+  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
       InterruptedException, ClassNotFoundException {
     Configuration conf = job.getConfiguration();
     InputFormat<?, ?> input =
       ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
-    
+
     List<InputSplit> splits = input.getSplits(job);
     T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
     // sort the splits into order based on size, so that the biggest
     // go first
     Arrays.sort(array, new SplitComparator());
-    DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, 
-                                                 array.length);
-    try {
-      if (array.length != 0) {
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        Job.RawSplit rawSplit = new Job.RawSplit();
-        SerializationFactory factory = new SerializationFactory(conf);
-        Serializer<T> serializer = 
-          factory.getSerializer((Class<T>) array[0].getClass());
-        serializer.open(buffer);
-        for (T split: array) {
-          rawSplit.setClassName(split.getClass().getName());
-          buffer.reset();
-          serializer.serialize(split);
-          rawSplit.setDataLength(split.getLength());
-          rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-          rawSplit.setLocations(split.getLocations());
-          rawSplit.write(out);
-        }
-        serializer.close();
-      }
-    } finally {
-      out.close();
-    }
+    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, array);
     return array.length;
   }
-
-  static final int CURRENT_SPLIT_FILE_VERSION = 0;
-  static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
-  private DataOutputStream writeSplitsFileHeader(Configuration conf,
-      Path filename, int length) throws IOException {
-    // write the splits to a file for the job tracker
-    FileSystem fs = filename.getFileSystem(conf);
-    FSDataOutputStream out = 
-      FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
-    out.write(SPLIT_FILE_HEADER);
-    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
-    WritableUtils.writeVInt(out, length);
-    return out;
-  }
-
+  
   private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
-                  Path submitSplitFile) throws IOException,
+      Path jobSubmitDir) throws IOException,
       InterruptedException, ClassNotFoundException {
     JobConf jConf = (JobConf)job.getConfiguration();
-    // Create the splits for the job
-    LOG.debug("Creating splits at " + jtFs.makeQualified(submitSplitFile));
     int maps;
     if (jConf.getUseNewMapper()) {
-      maps = writeNewSplits(job, submitSplitFile);
+      maps = writeNewSplits(job, jobSubmitDir);
     } else {
-      maps = writeOldSplits(jConf, submitSplitFile);
+      maps = writeOldSplits(jConf, jobSubmitDir);
     }
     return maps;
   }
-
-  // method to write splits for old api mapper.
-  private int writeOldSplits(JobConf job, 
-      Path submitSplitFile) throws IOException {
-    org.apache.hadoop.mapred.InputSplit[] splits = 
+  
+  //method to write splits for old api mapper.
+  private int writeOldSplits(JobConf job, Path jobSubmitDir) 
+  throws IOException {
+    org.apache.hadoop.mapred.InputSplit[] splits =
     job.getInputFormat().getSplits(job, job.getNumMapTasks());
     // sort the splits into order based on size, so that the biggest
     // go first
@@ -468,24 +428,7 @@
         }
       }
     });
-    DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile,
-      splits.length);
-
-    try {
-      DataOutputBuffer buffer = new DataOutputBuffer();
-      Job.RawSplit rawSplit = new Job.RawSplit();
-      for (org.apache.hadoop.mapred.InputSplit split: splits) {
-        rawSplit.setClassName(split.getClass().getName());
-        buffer.reset();
-        split.write(buffer);
-        rawSplit.setDataLength(split.getLength());
-        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-        rawSplit.setLocations(split.getLocations());
-        rawSplit.write(out);
-      }
-    } finally {
-      out.close();
-    }
+    JobSplitWriter.createSplitFiles(jobSubmitDir, job, splits);
     return splits.length;
   }
   
@@ -505,7 +448,7 @@
       } catch (IOException ie) {
         throw new RuntimeException("exception in compare", ie);
       } catch (InterruptedException ie) {
-        throw new RuntimeException("exception in compare", ie);        
+        throw new RuntimeException("exception in compare", ie);
       }
     }
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Mon Dec 21 17:36:44 2009
@@ -24,6 +24,7 @@
   MAP_OUTPUT_RECORDS,
   MAP_SKIPPED_RECORDS,
   MAP_OUTPUT_BYTES,
+  SPLIT_RAW_BYTES,
   COMBINE_INPUT_RECORDS,
   COMBINE_OUTPUT_RECORDS,
   REDUCE_INPUT_GROUPS,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java Mon Dec 21 17:36:44 2009
@@ -125,7 +125,6 @@
     Deserializer deserializer = factory.getDeserializer(inputSplitClass);
     deserializer.open((DataInputStream)in);
     inputSplit = (InputSplit)deserializer.deserialize(inputSplit);
-    deserializer.close();
   }
 
   private Class<?> readClass(DataInput in) throws IOException {
@@ -147,7 +146,6 @@
           factory.getSerializer(inputSplitClass);
     serializer.open((DataOutputStream)out);
     serializer.serialize(inputSplit);
-    serializer.close();
   }
 
   public Configuration getConf() {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java Mon Dec 21 17:36:44 2009
@@ -128,7 +128,6 @@
         factory.getSerializer(s.getClass());
       serializer.open((DataOutputStream)out);
       serializer.serialize(s);
-      serializer.close();
     }
   }
 
@@ -155,7 +154,6 @@
         Deserializer deserializer = factory.getDeserializer(cls[i]);
         deserializer.open((DataInputStream)in);
         splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
-        deserializer.close();
       }
     } catch (ClassNotFoundException e) {
       throw new IOException("Failed split init", e);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Mon Dec 21 17:36:44 2009
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.Counters;
@@ -87,8 +86,11 @@
    * Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
    * Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
    *             to ClusterMetrics as part of MAPREDUCE-1048.
+   * Version 30: Job submission files are uploaded to a staging area under
+   *             user home dir. JobTracker reads the required files from the
+   *             staging area using user credentials passed via the rpc.          
    */
-  public static final long versionID = 29L;
+  public static final long versionID = 30L;
 
   /**
    * Allocate a name for the job.
@@ -100,9 +102,8 @@
   /**
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
-   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
    */
-  public JobStatus submitJob(JobID jobName) 
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir) 
   throws IOException, InterruptedException;
 
   /**
@@ -219,7 +220,15 @@
    * 
    * @return the system directory where job-specific files are to be placed.
    */
-  public String getSystemDir() throws IOException, InterruptedException;  
+  public String getSystemDir() throws IOException, InterruptedException;
+  
+  /**
+   * Get a hint from the JobTracker 
+   * where job-specific files are to be placed.
+   * 
+   * @return the directory where job-specific files are to be placed.
+   */
+  public String getStagingAreaDir() throws IOException, InterruptedException;
 
   /**
    * Gets the directory location of the completed job history files.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Mon Dec 21 17:36:44 2009
@@ -80,6 +80,8 @@
   public static final String JT_AVG_BLACKLIST_THRESHOLD = 
     "mapreduce.jobtracker.blacklist.average.threshold";
   public static final String JT_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";
+  public static final String JT_STAGING_AREA_ROOT = 
+    "mapreduce.jobtracker.staging.root.dir";
   public static final String JT_MAX_TRACKER_BLACKLISTS = 
     "mapreduce.jobtracker.tasktracker.maxblacklists";
   public static final String JT_JOBHISTORY_MAXAGE = 
@@ -88,4 +90,6 @@
     "mapreduce.jobtracker.maxmapmemory.mb";
   public static final String JT_MAX_REDUCEMEMORY_MB = 
     "mapreduce.jobtracker.maxreducememory.mb";
+  public static final String MAX_JOB_SPLIT_METAINFO_SIZE = 
+  "mapreduce.job.split.metainfo.maxsize";
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This class groups the fundamental classes associated with
+ * reading/writing splits. The split information is divided into
+ * two parts based on the consumer of the information. The two
+ * parts are the split meta information, and the raw split 
+ * information. The first part is consumed by the JobTracker to
+ * create the tasks' locality data structures. The second part is
+ * used by the maps at runtime to know what to do!
+ * These pieces of information are written to two separate files.
+ * The metainformation file is slurped by the JobTracker during 
+ * job initialization. A map task gets the meta information during
+ * the launch and it reads the raw split bytes directly from the 
+ * file.
+ */
+@InterfaceAudience.Private
+public class JobSplit {
+  static final int META_SPLIT_VERSION = 1;
+  static final byte[] META_SPLIT_FILE_HEADER;
+  static {
+    try {
+      META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
+    } catch (UnsupportedEncodingException u) {
+      throw new RuntimeException(u);
+    }
+  } 
+  public static final TaskSplitMetaInfo EMPTY_TASK_SPLIT = 
+    new TaskSplitMetaInfo();
+  
+  /**
+   * This represents the meta information about the task split.
+   * The main fields are 
+   *     - start offset in actual split
+   *     - data length that will be processed in this split
+   *     - hosts on which this split is local
+   */
+  public static class SplitMetaInfo implements Writable {
+    private long startOffset;
+    private long inputDataLength;
+    private String[] locations;
+
+    public SplitMetaInfo() {}
+    
+    public SplitMetaInfo(String[] locations, long startOffset, 
+        long inputDataLength) {
+      this.locations = locations;
+      this.startOffset = startOffset;
+      this.inputDataLength = inputDataLength;
+    }
+    
+    public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
+      try {
+        this.locations = split.getLocations();
+        this.inputDataLength = split.getLength();
+        this.startOffset = startOffset;
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+    
+    public String[] getLocations() {
+      return locations;
+    }
+  
+    public long getStartOffset() {
+      return startOffset;
+    }
+      
+    public long getInputDataLength() {
+      return inputDataLength;
+    }
+    
+    public void setInputDataLocations(String[] locations) {
+      this.locations = locations;
+    }
+    
+    public void setInputDataLength(long length) {
+      this.inputDataLength = length;
+    }
+    
+    public void readFields(DataInput in) throws IOException {
+      int len = WritableUtils.readVInt(in);
+      locations = new String[len];
+      for (int i = 0; i < locations.length; i++) {
+        locations[i] = Text.readString(in);
+      }
+      startOffset = WritableUtils.readVLong(in);
+      inputDataLength = WritableUtils.readVLong(in);
+    }
+  
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVInt(out, locations.length);
+      for (int i = 0; i < locations.length; i++) {
+        Text.writeString(out, locations[i]);
+      }
+      WritableUtils.writeVLong(out, startOffset);
+      WritableUtils.writeVLong(out, inputDataLength);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append("data-size : " + inputDataLength + "\n");
+      buf.append("start-offset : " + startOffset + "\n");
+      buf.append("locations : " + "\n");
+      for (String loc : locations) {
+        buf.append("  " + loc + "\n");
+      }
+      return buf.toString();
+    }
+  }
+  /**
+   * This represents the meta information about the task split that the 
+   * JobTracker creates
+   */
+  public static class TaskSplitMetaInfo {
+    private TaskSplitIndex splitIndex;
+    private long inputDataLength;
+    private String[] locations;
+    public TaskSplitMetaInfo(){
+      this.splitIndex = new TaskSplitIndex();
+      this.locations = new String[0];
+    }
+    public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations, 
+        long inputDataLength) {
+      this.splitIndex = splitIndex;
+      this.locations = locations;
+      this.inputDataLength = inputDataLength;
+    }
+    public TaskSplitMetaInfo(InputSplit split, long startOffset) 
+    throws InterruptedException, IOException {
+      this(new TaskSplitIndex("", startOffset), split.getLocations(), 
+          split.getLength());
+    }
+    
+    public TaskSplitMetaInfo(String[] locations, long startOffset, 
+        long inputDataLength) {
+      this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
+    }
+    
+    public TaskSplitIndex getSplitIndex() {
+      return splitIndex;
+    }
+    
+    public String getSplitLocation() {
+      return splitIndex.getSplitLocation();
+    }
+    public long getInputDataLength() {
+      return inputDataLength;
+    }
+    public String[] getLocations() {
+      return locations;
+    }
+    public long getStartOffset() {
+      return splitIndex.getStartOffset();
+    }
+  }
+  
+  /**
+   * This represents the meta information about the task split that the 
+   * task gets
+   */
+  public static class TaskSplitIndex {
+    private String splitLocation;
+    private long startOffset;
+    public TaskSplitIndex(){
+      this("", 0);
+    }
+    public TaskSplitIndex(String splitLocation, long startOffset) {
+      this.splitLocation = splitLocation;
+      this.startOffset = startOffset;
+    }
+    public long getStartOffset() {
+      return startOffset;
+    }
+    public String getSplitLocation() {
+      return splitLocation;
+    }
+    public void readFields(DataInput in) throws IOException {
+      splitLocation = Text.readString(in);
+      startOffset = WritableUtils.readVLong(in);
+    }
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, splitLocation);
+      WritableUtils.writeVLong(out, startOffset);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * The class that is used by the Job clients to write splits (both the meta
+ * and the raw bytes parts)
+ */
+public class JobSplitWriter {
+
+  private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
+  private static final byte[] SPLIT_FILE_HEADER;
+  static {
+    try {
+      SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
+    } catch (UnsupportedEncodingException u) {
+      throw new RuntimeException(u);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
+      Configuration conf, List<InputSplit> splits) 
+  throws IOException, InterruptedException {
+    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
+    createSplitFiles(jobSubmitDir, conf, array);
+  }
+  
+  public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
+      Configuration conf,T[] splits) 
+  throws IOException, InterruptedException {
+    FileSystem fs = jobSubmitDir.getFileSystem(conf);
+    FSDataOutputStream out = createFile(fs, 
+        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
+    out.close();
+    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
+        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+        info);
+  }
+  
+  public static void createSplitFiles(Path jobSubmitDir, 
+      Configuration conf, org.apache.hadoop.mapred.InputSplit[] splits) 
+  throws IOException {
+    FileSystem fs = jobSubmitDir.getFileSystem(conf);
+    FSDataOutputStream out = createFile(fs, 
+        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+    SplitMetaInfo[] info = writeOldSplits(splits, out);
+    out.close();
+    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
+        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+        info);
+  }
+  
+  private static FSDataOutputStream createFile(FileSystem fs, Path splitFile, 
+      Configuration job)  throws IOException {
+    FSDataOutputStream out = FileSystem.create(fs, splitFile, 
+        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+    int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
+    fs.setReplication(splitFile, (short)replication);
+    writeSplitHeader(out);
+    return out;
+  }
+  private static void writeSplitHeader(FSDataOutputStream out) 
+  throws IOException {
+    out.write(SPLIT_FILE_HEADER);
+    out.writeInt(splitVersion);
+  }
+  
+  @SuppressWarnings("unchecked")
+  private static <T extends InputSplit> 
+  SplitMetaInfo[] writeNewSplits(Configuration conf, 
+      T[] array, FSDataOutputStream out)
+  throws IOException, InterruptedException {
+
+    SplitMetaInfo[] info = new SplitMetaInfo[array.length];
+    if (array.length != 0) {
+      SerializationFactory factory = new SerializationFactory(conf);
+      int i = 0;
+      long offset = out.size();
+      for(T split: array) {
+        int prevCount = out.size();
+        Text.writeString(out, split.getClass().getName());
+        Serializer<T> serializer = 
+          factory.getSerializer((Class<T>) split.getClass());
+        serializer.open(out);
+        serializer.serialize(split);
+        int currCount = out.size();
+        info[i++] = 
+          new JobSplit.SplitMetaInfo( 
+              split.getLocations(), offset,
+              split.getLength());
+        offset += currCount - prevCount;
+      }
+    }
+    return info;
+  }
+  
+  private static SplitMetaInfo[] writeOldSplits(
+      org.apache.hadoop.mapred.InputSplit[] splits,
+      FSDataOutputStream out) throws IOException {
+    SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
+    if (splits.length != 0) {
+      int i = 0;
+      long offset = out.size();
+      for(org.apache.hadoop.mapred.InputSplit split: splits) {
+        int prevLen = out.size();
+        Text.writeString(out, split.getClass().getName());
+        split.write(out);
+        int currLen = out.size();
+        info[i++] = new JobSplit.SplitMetaInfo( 
+            split.getLocations(), offset,
+            split.getLength());
+        offset += currLen - prevLen;
+      }
+    }
+    return info;
+  }
+
+  private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, 
+      FsPermission p, int splitMetaInfoVersion, 
+      JobSplit.SplitMetaInfo[] allSplitMetaInfo) 
+  throws IOException {
+    // write the splits meta-info to a file for the job tracker
+    FSDataOutputStream out = 
+      FileSystem.create(fs, filename, p);
+    out.write(JobSplit.META_SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(out, splitMetaInfoVersion);
+    WritableUtils.writeVInt(out, allSplitMetaInfo.length);
+    for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
+      splitMetaInfo.write(out);
+    }
+    out.close();
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * A utility that reads the split meta info and creates
+ * split meta info objects
+ */
+
+public class SplitMetaInfoReader {
+  
+  public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
+      JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
+  throws IOException {
+    long maxMetaInfoSize = conf.getLong(JTConfig.MAX_JOB_SPLIT_METAINFO_SIZE, 
+        10000000L);
+    Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
+    FileStatus fStatus = fs.getFileStatus(metaSplitFile);
+    if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+      throw new IOException("Split metadata size exceeded " +
+          maxMetaInfoSize +". Aborting job " + jobId);
+    }
+    FSDataInputStream in = fs.open(metaSplitFile);
+    byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+    in.readFully(header);
+    if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+      throw new IOException("Invalid header on split file");
+    }
+    int vers = WritableUtils.readVInt(in);
+    if (vers != JobSplit.META_SPLIT_VERSION) {
+      in.close();
+      throw new IOException("Unsupported split version " + vers);
+    }
+    int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
+    JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = 
+      new JobSplit.TaskSplitMetaInfo[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+      splitMetaInfo.readFields(in);
+      JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+          JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(), 
+          splitMetaInfo.getStartOffset());
+      allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, 
+          splitMetaInfo.getLocations(), 
+          splitMetaInfo.getInputDataLength());
+    }
+    in.close();
+    return allSplitMetaInfo;
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Mon Dec 21 17:36:44 2009
@@ -185,8 +185,6 @@
       new String[] {JobContext.QUEUE_NAME});
     Configuration.addDeprecation("mapred.job.reuse.jvm.num.tasks", 
       new String[] {JobContext.JVM_NUMTASKS_TORUN});
-    Configuration.addDeprecation("mapred.job.split.file", 
-      new String[] {JobContext.SPLIT_FILE});
     Configuration.addDeprecation("mapred.map.tasks", 
       new String[] {JobContext.NUM_MAPS});
     Configuration.addDeprecation("mapred.max.tracker.failures", 

Modified: hadoop/mapreduce/trunk/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred-site.xml?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred-site.xml (original)
+++ hadoop/mapreduce/trunk/src/test/mapred-site.xml Mon Dec 21 17:36:44 2009
@@ -19,4 +19,9 @@
   <value>false</value>
   <description></description>
 </property>
+<property>
+  <name>mapreduce.jobtracker.staging.root.dir</name>
+  <value>${hadoop.tmp.dir}/staging</value>
+  <description></description>
+</property>
 </configuration>

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Mon Dec 21 17:36:44 2009
@@ -28,8 +28,10 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -142,10 +144,10 @@
     String[] splits = ugi.split(",");
     taskControllerUser = new UnixUserGroupInformation(splits);
     clusterConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
-    createHomeDirectory(clusterConf);
+    createHomeAndStagingDirectory(clusterConf);
   }
 
-  private void createHomeDirectory(JobConf conf)
+  private void createHomeAndStagingDirectory(JobConf conf)
       throws IOException {
     FileSystem fs = dfsCluster.getFileSystem();
     String path = "/user/" + taskControllerUser.getUserName();
@@ -153,6 +155,10 @@
     LOG.info("Creating Home directory : " + homeDirectory);
     fs.mkdirs(homeDirectory);
     changePermission(conf, homeDirectory);
+    Path stagingArea = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT));
+    LOG.info("Creating Staging root directory : " + stagingArea);
+    fs.mkdirs(stagingArea);
+    fs.setPermission(stagingArea, new FsPermission((short)0777));
   }
 
   private void changePermission(JobConf conf, Path p)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Mon Dec 21 17:36:44 2009
@@ -29,11 +29,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 
 /** 
  * Utilities used in unit test.
@@ -75,7 +75,6 @@
   }
 
   static class FakeJobInProgress extends JobInProgress {
-    Job.RawSplit[] rawSplits;
     @SuppressWarnings("deprecation")
     FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
       super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
@@ -89,27 +88,27 @@
     @Override
     public synchronized void initTasks() throws IOException {
      
-      Job.RawSplit[] splits = createSplits();
-      numMapTasks = splits.length;
-      createMapTasks(null, splits);
-      nonRunningMapCache = createCache(splits, maxLevel);
+      TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+      numMapTasks = taskSplitMetaInfo.length;
+      createMapTasks(null, taskSplitMetaInfo);
+      nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
       createReduceTasks(null);
       tasksInited.set(true);
       this.status.setRunState(JobStatus.RUNNING);
     }
     
     @Override
-    Job.RawSplit[] createSplits(){
-      Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
+    TaskSplitMetaInfo [] createSplits(org.apache.hadoop.mapreduce.JobID jobId){
+      TaskSplitMetaInfo[] splits = 
+        new TaskSplitMetaInfo[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
-        splits[i] = new Job.RawSplit();
-        splits[i].setLocations(new String[0]);
+        splits[i] = JobSplit.EMPTY_TASK_SPLIT;
       }
       return splits;
     }
     
     @Override
-    protected void createMapTasks(String ignored, Job.RawSplit[] splits) {
+    protected void createMapTasks(String ignored, TaskSplitMetaInfo[] splits) {
       maps = new TaskInProgress[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
         maps[i] = new TaskInProgress(getJobID(), "test", 
@@ -260,7 +259,7 @@
           numSlotsRequired);
     }
 
-    public FakeTaskInProgress(JobID jobId, String jobFile, RawSplit emptySplit,
+    public FakeTaskInProgress(JobID jobId, String jobFile, TaskSplitMetaInfo emptySplit,
         JobTracker jobTracker, JobConf jobConf,
         JobInProgress job, int partition, int numSlotsRequired) {
       super(jobId, jobFile, emptySplit, jobTracker, jobConf, job,

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java Mon Dec 21 17:36:44 2009
@@ -141,16 +141,16 @@
        org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
        null)) {
       // specified IndirectInputFormat? Build src list
-      JobClient jClient = new JobClient(job);  
-      Path sysdir = jClient.getSystemDir();
+      JobClient jClient = new JobClient(job);
+      Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
       Random r = new Random();
-      Path indirInputFile = new Path(sysdir,
+      Path indirInputFile = new Path(tmpDir,
           Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
       job.set(
         org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
         indirInputFile.toString());
       SequenceFile.Writer writer = SequenceFile.createWriter(
-          sysdir.getFileSystem(job), job, indirInputFile,
+          tmpDir.getFileSystem(job), job, indirInputFile,
           LongWritable.class, Text.class,
           SequenceFile.CompressionType.NONE);
       try {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java Mon Dec 21 17:36:44 2009
@@ -40,10 +40,9 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.StaticMapping;
@@ -101,16 +100,14 @@
     }
 
     @Override
-    Job.RawSplit[] createSplits() {
+    TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) {
       // Set all splits to reside on one host. This will ensure that 
       // one tracker gets data local, one gets rack local and two others
       // get non-local maps
-      Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
+      TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMapTasks];
       String[] splitHosts0 = new String[] { hosts[0] };
       for (int i = 0; i < numMapTasks; i++) {
-        splits[i] = new Job.RawSplit();
-        splits[i].setDataLength(0);
-        splits[i].setLocations(splitHosts0);
+        splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0);
       }
       return splits;
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Mon Dec 21 17:36:44 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 public class TestJobQueueTaskScheduler extends TestCase {
   
@@ -81,7 +82,7 @@
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(TaskType.MAP);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
+      Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(), 1) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Mon Dec 21 17:36:44 2009
@@ -56,7 +56,8 @@
                                            Path outDir,
                                            String input,
                                            int numMaps,
-                                           int numReduces) throws IOException {
+                                           int numReduces,
+                                           String sysDir) throws IOException {
     FileSystem inFs = inDir.getFileSystem(conf);
     FileSystem outFs = outDir.getFileSystem(conf);
     outFs.delete(outDir, true);
@@ -90,14 +91,13 @@
     assertFalse(FileSystem.get(conf).exists(
       new Path(conf.get(JTConfig.JT_SYSTEM_DIR)))); 
     // Check if the Job Tracker system dir is propogated to client
-    String sysDir = jobClient.getSystemDir().toString();
-    System.out.println("Job sys dir -->" + sysDir);
     assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
     assertTrue(sysDir.contains("custom"));
     return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
   }
 
- static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+ static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir) 
+ throws IOException {
     LOG.info("runWordCount");
     // Run a word count example
     // Keeping tasks that match this pattern
@@ -107,7 +107,7 @@
     result = launchWordCount(jobConf, inDir, outDir,
                              "The quick brown fox\nhas many silly\n" + 
                              "red fox sox\n",
-                             3, 1);
+                             3, 1, sysDir);
     assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                  "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
     // Checking if the Job ran successfully in spite of different system dir config
@@ -128,7 +128,7 @@
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
 
-      runWordCount(mr, mr.createJobConf());
+      runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java Mon Dec 21 17:36:44 2009
@@ -52,7 +52,7 @@
   @SuppressWarnings("deprecation")
   public void testKillCompletedJob() throws IOException, InterruptedException {
     job = new MyFakeJobInProgress(new JobConf(), jobTracker);
-    jobTracker.addJob(job.getJobID(), job);
+    jobTracker.addJob(job.getJobID(), (JobInProgress)job);
     job.status.setRunState(JobStatus.SUCCEEDED);
 
     jobTracker.killJob(job.getJobID());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Mon Dec 21 17:36:44 2009
@@ -20,6 +20,9 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
@@ -27,10 +30,19 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  *  Validates map phase progress.
@@ -92,9 +104,9 @@
    */
   class TestMapTask extends MapTask {
     public TestMapTask(String jobFile, TaskAttemptID taskId, 
-        int partition, String splitClass, BytesWritable split,
+        int partition, TaskSplitIndex splitIndex,
         int numSlotsRequired) {
-      super(jobFile, taskId, partition, splitClass, split, numSlotsRequired);
+      super(jobFile, taskId, partition, splitIndex, numSlotsRequired);
     }
     
     /**
@@ -141,16 +153,20 @@
     jobId = taskId.getJobID();
     
     JobContext jContext = new JobContextImpl(job, jobId);
-    Job.RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
+    InputFormat<?, ?> input =
+      ReflectionUtils.newInstance(jContext.getInputFormatClass(), job);
 
-    job.setUseNewMapper(true); // use new api
-    for (int i = 0; i < rawSplits.length; i++) {// rawSplits.length is 1
+    List<InputSplit> splits = input.getSplits(jContext);
+    JobSplitWriter.createSplitFiles(new Path(TEST_ROOT_DIR), job, splits);
+    TaskSplitMetaInfo[] splitMetaInfo = 
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, job, new Path(TEST_ROOT_DIR));
+    job.setUseNewMapper(true); // use new api    
+    for (int i = 0; i < splitMetaInfo.length; i++) {// rawSplits.length is 1
       map = new TestMapTask(
           job.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system") +
           jobId + "job.xml",  
           taskId, i,
-          rawSplits[i].getClassName(),
-          rawSplits[i].getBytes(), 1);
+          splitMetaInfo[i].getSplitIndex(), 1);
 
       JobConf localConf = new JobConf(job);
       map.localizeConfiguration(localConf);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java Mon Dec 21 17:36:44 2009
@@ -38,15 +38,13 @@
 public class TestMiniMRClasspath extends TestCase {
   
   
-  static String launchWordCount(URI fileSys,
+  static void configureWordCount(FileSystem fs,
                                 String jobTracker,
                                 JobConf conf,
                                 String input,
                                 int numMaps,
-                                int numReduces) throws IOException {
-    final Path inDir = new Path("/testing/wc/input");
-    final Path outDir = new Path("/testing/wc/output");
-    FileSystem fs = FileSystem.get(fileSys, conf);
+                                int numReduces,
+                                Path inDir, Path outDir) throws IOException {
     fs.delete(outDir, true);
     if (!fs.mkdirs(inDir)) {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
@@ -56,7 +54,7 @@
       file.writeBytes(input);
       file.close();
     }
-    FileSystem.setDefaultUri(conf, fileSys);
+    FileSystem.setDefaultUri(conf, fs.getUri());
     conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
     conf.setJobName("wordcount");
     conf.setInputFormat(TextInputFormat.class);
@@ -75,6 +73,16 @@
     conf.setNumReduceTasks(numReduces);
     //pass a job.jar already included in the hadoop build
     conf.setJar("build/test/mapred/testjar/testjob.jar");
+  }
+  
+  static String launchWordCount(URI fileSys, String jobTracker, JobConf conf,
+                                String input, int numMaps, int numReduces) 
+  throws IOException {
+    final Path inDir = new Path("/testing/wc/input");
+    final Path outDir = new Path("/testing/wc/output");
+    FileSystem fs = FileSystem.get(fileSys, conf);
+    configureWordCount(fs, jobTracker, conf, input, numMaps, numReduces, inDir, 
+                       outDir);
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Dec 21 17:36:44 2009
@@ -36,12 +36,14 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -289,8 +291,11 @@
     long hdfsWrite = 
       counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
           Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    long rawSplitBytesRead = 
+      counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getCounter();
     assertEquals(result.output.length(), hdfsWrite);
-    assertEquals(input.length(), hdfsRead);
+    // add the correction factor of 234 as the input split is also streamed
+    assertEquals(input.length() + rawSplitBytesRead, hdfsRead);
 
     // Run a job with input and output going to localfs even though the 
     // default fs is hdfs.
@@ -334,7 +339,7 @@
     }
   }
   
-  public void testWithDFSWithDefaultPort() throws IOException {
+  public void tesWithDFSWithDefaultPort() throws IOException {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     FileSystem fileSys = null;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Mon Dec 21 17:36:44 2009
@@ -23,9 +23,16 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.security.*;
 
 /**
@@ -43,7 +50,11 @@
   }
   
   static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
-    JobConf jobconf = mr.createJobConf();
+    return createJobConf(mr.createJobConf(), ugi);
+  }
+
+  static JobConf createJobConf(JobConf conf, UnixUserGroupInformation ugi) {
+    JobConf jobconf = new JobConf(conf);
     UnixUserGroupInformation.saveToConf(jobconf,
         UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
     return jobconf;
@@ -55,6 +66,50 @@
     fs.setPermission(p, new FsPermission((short)0777));
   }
 
+  // runs a sample job as a user (ugi)
+  RunningJob runJobAsUser(JobConf job, UserGroupInformation ugi) 
+  throws Exception {
+    ClientProtocol jobSubmitClient = 
+      TestSubmitJob.getJobSubmitClient(job, ugi);
+    org.apache.hadoop.mapreduce.JobID id = jobSubmitClient.getNewJobID();
+    
+    InputSplit[] splits = computeJobSplit(JobID.downgrade(id), job);
+    Path jobSubmitDir = new Path(id.toString());
+    FileSystem fs = jobSubmitDir.getFileSystem(job);
+    jobSubmitDir = jobSubmitDir.makeQualified(fs);
+    uploadJobFiles(JobID.downgrade(id), splits, jobSubmitDir, job);
+    
+    jobSubmitClient.submitJob(id, jobSubmitDir.toString());
+    
+    JobClient jc = new JobClient(job);
+    return jc.getJob(JobID.downgrade(id));
+  }
+  
+  // a helper api for split computation
+  private InputSplit[] computeJobSplit(JobID id, JobConf conf) 
+  throws IOException {
+    InputSplit[] splits = 
+      conf.getInputFormat().getSplits(conf, conf.getNumMapTasks());
+    conf.setNumMapTasks(splits.length);
+    return splits;
+  }
+
+
+  // a helper api for split submission
+  private void uploadJobFiles(JobID id, InputSplit[] splits,
+                             Path jobSubmitDir, JobConf conf) 
+  throws IOException {
+    Path confLocation = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, splits);
+    FileSystem fs = confLocation.getFileSystem(conf);
+    FsPermission perm = new FsPermission((short)0700);
+    
+    // localize conf
+    DataOutputStream confOut = FileSystem.create(fs, confLocation, perm);
+    conf.writeXml(confOut);
+    confOut.close();
+  }
+  
   public void testDistinctUsers() throws Exception {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
@@ -71,15 +126,32 @@
           UnixUserGroupInformation.login().getUserName(), false); 
       mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
            1, null, null, MR_UGI);
+      String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
 
-      JobConf pi = createJobConf(mr, PI_UGI);
-      TestMiniMRWithDFS.runPI(mr, pi);
-
-      JobConf wc = createJobConf(mr, WC_UGI);
-      TestMiniMRWithDFS.runWordCount(mr, wc);
+      JobConf job1 = mr.createJobConf();
+      String input = "The quick brown fox\nhas many silly\n" 
+                     + "red fox sox\n";
+      Path inDir = new Path("/testing/distinct/input");
+      Path outDir = new Path("/testing/distinct/output");
+      TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, 
+                                             input, 2, 1, inDir, outDir);
+      job1 = createJobConf(job1, PI_UGI);
+      runJobAsUser(job1, PI_UGI);
+
+      JobConf job2 = mr.createJobConf();
+      Path inDir2 = new Path("/testing/distinct/input2");
+      Path outDir2 = new Path("/testing/distinct/output2");
+      TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job2, 
+                                             input, 2, 1, inDir2, outDir2);
+      job2 = createJobConf(job2, WC_UGI);
+      runJobAsUser(job2, WC_UGI);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();}
     }
   }
+  
+  public void testRestartWithDistinctUsers() {
+    
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Mon Dec 21 17:36:44 2009
@@ -31,8 +31,10 @@
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.StaticMapping;
 
@@ -93,38 +95,36 @@
 
     @Override
     public void initTasks() throws IOException {
-      Job.RawSplit[] splits = createSplits();
-      numMapTasks = splits.length;
-      createMapTasks(null, splits);
-      nonRunningMapCache = createCache(splits, maxLevel);
+      TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+      numMapTasks = taskSplitMetaInfo.length;
+      createMapTasks(null, taskSplitMetaInfo);
+      nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
       tasksInited.set(true);
       this.status.setRunState(JobStatus.RUNNING);
 
     }
   
-
-    protected Job.RawSplit[] createSplits() throws IOException {
-      Job.RawSplit[] splits = new Job.RawSplit[numMaps];
+    @Override
+    protected TaskSplitMetaInfo [] createSplits(
+        org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
+      TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMaps];
       // Hand code for now. 
       // M0,2,3 reside in Host1
       // M1 resides in Host3
       // M4 resides in Host4
       String[] splitHosts0 = new String[] { allHosts[0] };
 
-      for (int i = 0; i < numMaps; i++) {
-        splits[i] = new Job.RawSplit();
-        splits[i].setDataLength(0);
-      }
-
-      splits[0].setLocations(splitHosts0);
-      splits[2].setLocations(splitHosts0);
-      splits[3].setLocations(splitHosts0);
-      
       String[] splitHosts1 = new String[] { allHosts[2] };
-      splits[1].setLocations(splitHosts1);
-
       String[] splitHosts2 = new String[] { allHosts[3] };
-      splits[4].setLocations(splitHosts2);
+      for (int i = 0; i < numMaps; i++) {
+    	if (i == 0 || i == 2 || i == 3) {
+          splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0);
+        } else if (i == 1) {
+          splits[i] = new TaskSplitMetaInfo(splitHosts1, 0, 0);
+        } else if (i == 4) {
+          splits[i] = new TaskSplitMetaInfo(splitHosts2, 0, 0);
+        }
+      }
 
       return splits;
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Mon Dec 21 17:36:44 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
 
 import junit.framework.TestCase;
@@ -48,10 +49,11 @@
   /**
    * Tests the {@link JobTracker} against the exceptions thrown in 
    * {@link JobTracker.RecoveryManager}. It does the following :
-   *  - submits 2 jobs
+   *  - submits 3 jobs
    *  - kills the jobtracker
    *  - Garble job.xml for one job causing it to fail in constructor 
    *    and job.split for another causing it to fail in init.
+   *  - delete the job temp/submit dir
    *  - restarts the jobtracker
    *  - checks if the jobtraker starts normally
    */
@@ -81,7 +83,7 @@
       LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
       UtilsForTests.waitFor(100);
     }
-    
+        
     JobConf job2 = mr.createJobConf();
     
     UtilsForTests.configureWaitingJobConf(job2, 
@@ -105,26 +107,15 @@
     // delete the job.xml of job #1 causing the job to fail in submit Job
     //while recovery itself
     Path jobFile = 
-      new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
-    LOG.info("Deleting job.xml file : " + jobFile.toString());
+      new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
+    LOG.info("Deleting job token file : " + jobFile.toString());
     fs.delete(jobFile, false); // delete the job.xml file
     
-    // create the job.xml file with 0 bytes
+    // create the job token file with 1 byte
     FSDataOutputStream out = fs.create(jobFile);
     out.write(1);
     out.close();
-
-    // delete the job.split of job #2 causing the job to fail in initTasks
-    Path jobSplitFile = 
-      new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
-    LOG.info("Deleting job.split file : " + jobSplitFile.toString());
-    fs.delete(jobSplitFile, false); // delete the job.split file
     
-    // create the job.split file with 0 bytes
-    out = fs.create(jobSplitFile);
-    out.write(1);
-    out.close();
-
     // make sure that the jobtracker is in recovery mode
     mr.getJobTrackerConf().setBoolean(JTConfig.JT_RESTART_ENABLED, true);
     // start the jobtracker

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Mon Dec 21 17:36:44 2009
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.mapred;
 
-import junit.framework.TestCase;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import junit.framework.TestCase;
 
 public class TestResourceEstimation extends TestCase {
   
@@ -47,8 +46,8 @@
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      Job.RawSplit split = new Job.RawSplit();
-      split.setDataLength(0);
+      JobSplit.TaskSplitMetaInfo split = 
+        new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
     }
@@ -83,8 +82,9 @@
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      Job.RawSplit split = new Job.RawSplit();
-      split.setDataLength(singleMapInputSize);
+      JobSplit.TaskSplitMetaInfo split = 
+        new JobSplit.TaskSplitMetaInfo(new String[0], 0, 
+                                           singleMapInputSize);
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
     }
@@ -95,8 +95,8 @@
     //add one more map task with input size as 0
     TaskStatus ts = new MapTaskStatus();
     ts.setOutputSize(singleMapOutputSize);
-    Job.RawSplit split = new Job.RawSplit();
-    split.setDataLength(0);
+    JobSplit.TaskSplitMetaInfo split = 
+      new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
     TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
     re.updateWithCompletedTask(ts, tip);
     

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Mon Dec 21 17:36:44 2009
@@ -25,6 +25,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeTaskInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 
@@ -60,7 +61,7 @@
     @Override
     public synchronized void initTasks() throws IOException {
       super.initTasks();
-      Job.RawSplit emptySplit = new Job.RawSplit();
+      JobSplit.TaskSplitMetaInfo emptySplit = new JobSplit.TaskSplitMetaInfo();
       setup = new TaskInProgress[2];
       setup[0] = new TaskInProgress(getJobID(), "test",  emptySplit,
           jobtracker, getJobConf(), this, numMapTasks + 1, 1);
@@ -109,12 +110,13 @@
     @Override
     public synchronized void initTasks() throws IOException {
       super.initTasks();
-      Job.RawSplit emptySplit = new Job.RawSplit();
+
       final int numSlotsPerTask = 2;
       maps = new TaskInProgress[1];
       reduces = new TaskInProgress[1];
       
-      maps[0] = new FakeTaskInProgress(getJobID(), "test",  emptySplit,
+      maps[0] = new FakeTaskInProgress(getJobID(), "test",  
+          JobSplit.EMPTY_TASK_SPLIT,
           jobtracker, getJobConf(), this, 0, numSlotsPerTask);
       TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0);
       

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Mon Dec 21 17:36:44 2009
@@ -92,6 +92,8 @@
 
         TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
         TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+        TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, 
+            conf.get(JTConfig.JT_STAGING_AREA_ROOT));
 
         UnixUserGroupInformation MR_UGI = 
           TestMiniMRWithDFSWithDistinctUsers.createUGI(